Java多线程父线程向子线程传值解决方案 1

简介: Java多线程父线程向子线程传值解决方案

1 背景

在实际开发过程中我们需要父子之间传递一些数据,比如用户信息,日志异步生成数据传递等,该文章从5种解决方案解决父子之间数据传递困扰

2 ThreadLocal+TaskDecorator

用户工具类 UserUtils

/**
 *使用ThreadLocal存储共享的数据变量,如登录的用户信息
 */
public class UserUtils {
    private static  final  ThreadLocal<String> userLocal=new ThreadLocal<>();
    public static  String getUserId(){
        return userLocal.get();
    }
    public static void setUserId(String userId){
        userLocal.set(userId);
    }
    public static void clear(){
        userLocal.remove();
    }
}

自定义CustomTaskDecorator

/**
 * 线程池修饰类
 */
public class CustomTaskDecorator implements TaskDecorator {
    @Override
    public Runnable decorate(Runnable runnable) {
        // 获取主线程中的请求信息(我们的用户信息也放在里面)
        String robotId = UserUtils.getUserId();
        System.out.println(robotId);
        return () -> {
            try {
                // 将主线程的请求信息,设置到子线程中
                UserUtils.setUserId(robotId);
                // 执行子线程,这一步不要忘了
                runnable.run();
            } finally {
                // 线程结束,清空这些信息,否则可能造成内存泄漏
                UserUtils.clear();
            }
        };
    }
}

ExecutorConfig

在原来的基础上增加 executor.setTaskDecorator(new CustomTaskDecorator());

@Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor----------------");
        //ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //使用可视化运行状态的线程池
        ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(corePoolSize);
        //配置最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        //配置队列大小
        executor.setQueueCapacity(queueCapacity);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix(namePrefix);
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //增加线程池修饰类
        executor.setTaskDecorator(new CustomTaskDecorator());
        //增加MDC的线程池修饰类
        //executor.setTaskDecorator(new MDCTaskDecorator());
        //执行初始化
        executor.initialize();
        log.info("end asyncServiceExecutor------------");
        return executor;
    }

AsyncServiceImpl

    /**
     * 使用ThreadLocal方式传递
     * 带有返回值
     * @throws InterruptedException
     */
    @Async("asyncServiceExecutor")
    public CompletableFuture<String> executeValueAsync2() throws InterruptedException {
        log.info("start executeValueAsync");
        System.out.println("异步线程执行返回结果......+");
        log.info("end executeValueAsync");
        return CompletableFuture.completedFuture(UserUtils.getUserId());
    }

Test2Controller

    /**
     * 使用ThreadLocal+TaskDecorator的方式
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     */
    @GetMapping("/test2")
    public String test2() throws InterruptedException, ExecutionException {
        UserUtils.setUserId("123456");
        CompletableFuture<String> completableFuture = asyncService.executeValueAsync2();
        String s = completableFuture.get();
        return s;
    }

3 RequestContextHolder+TaskDecorator

自定义CustomTaskDecorator

/**
 * 线程池修饰类
 */
public class CustomTaskDecorator implements TaskDecorator {
    @Override
    public Runnable decorate(Runnable runnable) {
        // 获取主线程中的请求信息(我们的用户信息也放在里面)
        RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
        return () -> {
            try {
                // 将主线程的请求信息,设置到子线程中
                RequestContextHolder.setRequestAttributes(attributes);
                // 执行子线程,这一步不要忘了
                runnable.run();
            } finally {
                // 线程结束,清空这些信息,否则可能造成内存泄漏
                RequestContextHolder.resetRequestAttributes();
            }
        };
    }
}

ExecutorConfig

在原来的基础上增加 executor.setTaskDecorator(new CustomTaskDecorator());

@Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor----------------");
        //ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //使用可视化运行状态的线程池
        ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(corePoolSize);
        //配置最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        //配置队列大小
        executor.setQueueCapacity(queueCapacity);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix(namePrefix);
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //增加线程池修饰类
        executor.setTaskDecorator(new CustomTaskDecorator());
        //增加MDC的线程池修饰类
        //executor.setTaskDecorator(new MDCTaskDecorator());
        //执行初始化
        executor.initialize();
        log.info("end asyncServiceExecutor------------");
        return executor;
    }

AsyncServiceImpl

     /**
     * 使用RequestAttributes获取主线程传递的数据
     * @return
     * @throws InterruptedException
     */
    @Async("asyncServiceExecutor")
    public CompletableFuture<String> executeValueAsync3() throws InterruptedException {
        log.info("start executeValueAsync");
        System.out.println("异步线程执行返回结果......+");
        RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
        Object userId = attributes.getAttribute("userId", 0);
        log.info("end executeValueAsync");
        return CompletableFuture.completedFuture(userId.toString());
    }

Test2Controller

    /**
     * RequestContextHolder+TaskDecorator的方式
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     */
    @GetMapping("/test3")
    public String test3() throws InterruptedException, ExecutionException {
        RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
        attributes.setAttribute("userId","123456",0);
        CompletableFuture<String> completableFuture = asyncService.executeValueAsync3();
        String s = completableFuture.get();
        return s;
    }

4 MDC+TaskDecorator

自定义MDCTaskDecorator

/**
 * 线程池修饰类
 */
public class MDCTaskDecorator implements TaskDecorator {
    @Override
    public Runnable decorate(Runnable runnable) {
        // 获取主线程中的请求信息(我们的用户信息也放在里面)
        String userId = MDC.get("userId");
        Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();
        System.out.println(copyOfContextMap);
        return () -> {
            try {
                // 将主线程的请求信息,设置到子线程中
                MDC.put("userId",userId);
                // 执行子线程,这一步不要忘了
                runnable.run();
            } finally {
                // 线程结束,清空这些信息,否则可能造成内存泄漏
                MDC.clear();
            }
        };
    }
}

ExecutorConfig

在原来的基础上增加 executor.setTaskDecorator(new MDCTaskDecorator());

@Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor----------------");
        //ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //使用可视化运行状态的线程池
        ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(corePoolSize);
        //配置最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        //配置队列大小
        executor.setQueueCapacity(queueCapacity);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix(namePrefix);
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //增加MDC的线程池修饰类
        executor.setTaskDecorator(new MDCTaskDecorator());
        //执行初始化
        executor.initialize();
        log.info("end asyncServiceExecutor------------");
        return executor;
    }

AsyncServiceImpl

         /**
     * 使用MDC获取主线程传递的数据
     * @return
     * @throws InterruptedException
     */
    @Async("asyncServiceExecutor")
    public CompletableFuture<String> executeValueAsync5() throws InterruptedException {
        log.info("start executeValueAsync");
        System.out.println("异步线程执行返回结果......+");
        log.info("end executeValueAsync");
        return CompletableFuture.completedFuture(MDC.get("userId"));
    }

Test2Controller

     /**
     * 使用MDC+TaskDecorator方式
     * 本质也是ThreadLocal+TaskDecorator方式
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     */
    @GetMapping("/test5")
    public String test5() throws InterruptedException, ExecutionException {
        MDC.put("userId","123456");
        CompletableFuture<String> completableFuture = asyncService.executeValueAsync5();
        String s = completableFuture.get();
        return s;
    }


目录
相关文章
|
7月前
|
JSON 网络协议 安全
【Java】(10)进程与线程的关系、Tread类;讲解基本线程安全、网络编程内容;JSON序列化与反序列化
几乎所有的操作系统都支持进程的概念,进程是处于运行过程中的程序,并且具有一定的独立功能,进程是系统进行资源分配和调度的一个独立单位一般而言,进程包含如下三个特征。独立性动态性并发性。
371 1
|
7月前
|
JSON 网络协议 安全
【Java基础】(1)进程与线程的关系、Tread类;讲解基本线程安全、网络编程内容;JSON序列化与反序列化
几乎所有的操作系统都支持进程的概念,进程是处于运行过程中的程序,并且具有一定的独立功能,进程是系统进行资源分配和调度的一个独立单位一般而言,进程包含如下三个特征。独立性动态性并发性。
349 1
|
8月前
|
数据采集 存储 弹性计算
高并发Java爬虫的瓶颈分析与动态线程优化方案
高并发Java爬虫的瓶颈分析与动态线程优化方案
Java 数据库 Spring
342 0
|
8月前
|
算法 Java
Java多线程编程:实现线程间数据共享机制
以上就是Java中几种主要处理多线程序列化资源以及协调各自独立运行但需相互配合以完成任务threads 的技术手段与策略。正确应用上述技术将大大增强你程序稳定性与效率同时也降低bug出现率因此深刻理解每项技术背后理论至关重要.
516 16
|
9月前
|
缓存 并行计算 安全
关于Java多线程详解
本文深入讲解Java多线程编程,涵盖基础概念、线程创建与管理、同步机制、并发工具类、线程池、线程安全集合、实战案例及常见问题解决方案,助你掌握高性能并发编程技巧,应对多线程开发中的挑战。
|
9月前
|
数据采集 存储 前端开发
Java爬虫性能优化:多线程抓取JSP动态数据实践
Java爬虫性能优化:多线程抓取JSP动态数据实践
|
7月前
|
Java 调度 数据库
Python threading模块:多线程编程的实战指南
本文深入讲解Python多线程编程,涵盖threading模块的核心用法:线程创建、生命周期、同步机制(锁、信号量、条件变量)、线程通信(队列)、守护线程与线程池应用。结合实战案例,如多线程下载器,帮助开发者提升程序并发性能,适用于I/O密集型任务处理。
692 0
|
7月前
|
Java
如何在Java中进行多线程编程
Java多线程编程常用方式包括:继承Thread类、实现Runnable接口、Callable接口(可返回结果)及使用线程池。推荐线程池以提升性能,避免频繁创建线程。结合同步与通信机制,可有效管理并发任务。
288 6
|
10月前
|
Java API 微服务
为什么虚拟线程将改变Java并发编程?
为什么虚拟线程将改变Java并发编程?
435 83