1 背景
在实际开发过程中我们需要父子之间传递一些数据,比如用户信息,日志异步生成数据传递等,该文章从5种解决方案解决父子之间数据传递困扰
2 ThreadLocal+TaskDecorator
用户工具类 UserUtils
/** *使用ThreadLocal存储共享的数据变量,如登录的用户信息 */ public class UserUtils { private static final ThreadLocal<String> userLocal=new ThreadLocal<>(); public static String getUserId(){ return userLocal.get(); } public static void setUserId(String userId){ userLocal.set(userId); } public static void clear(){ userLocal.remove(); } }
自定义CustomTaskDecorator
/** * 线程池修饰类 */ public class CustomTaskDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable) { // 获取主线程中的请求信息(我们的用户信息也放在里面) String robotId = UserUtils.getUserId(); System.out.println(robotId); return () -> { try { // 将主线程的请求信息,设置到子线程中 UserUtils.setUserId(robotId); // 执行子线程,这一步不要忘了 runnable.run(); } finally { // 线程结束,清空这些信息,否则可能造成内存泄漏 UserUtils.clear(); } }; } }
ExecutorConfig
在原来的基础上增加 executor.setTaskDecorator(new CustomTaskDecorator());
@Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { log.info("start asyncServiceExecutor----------------"); //ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //使用可视化运行状态的线程池 ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(corePoolSize); //配置最大线程数 executor.setMaxPoolSize(maxPoolSize); //配置队列大小 executor.setQueueCapacity(queueCapacity); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix(namePrefix); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //增加线程池修饰类 executor.setTaskDecorator(new CustomTaskDecorator()); //增加MDC的线程池修饰类 //executor.setTaskDecorator(new MDCTaskDecorator()); //执行初始化 executor.initialize(); log.info("end asyncServiceExecutor------------"); return executor; }
AsyncServiceImpl
/** * 使用ThreadLocal方式传递 * 带有返回值 * @throws InterruptedException */ @Async("asyncServiceExecutor") public CompletableFuture<String> executeValueAsync2() throws InterruptedException { log.info("start executeValueAsync"); System.out.println("异步线程执行返回结果......+"); log.info("end executeValueAsync"); return CompletableFuture.completedFuture(UserUtils.getUserId()); }
Test2Controller
/** * 使用ThreadLocal+TaskDecorator的方式 * @return * @throws InterruptedException * @throws ExecutionException */ @GetMapping("/test2") public String test2() throws InterruptedException, ExecutionException { UserUtils.setUserId("123456"); CompletableFuture<String> completableFuture = asyncService.executeValueAsync2(); String s = completableFuture.get(); return s; }
3 RequestContextHolder+TaskDecorator
自定义CustomTaskDecorator
/** * 线程池修饰类 */ public class CustomTaskDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable) { // 获取主线程中的请求信息(我们的用户信息也放在里面) RequestAttributes attributes = RequestContextHolder.getRequestAttributes(); return () -> { try { // 将主线程的请求信息,设置到子线程中 RequestContextHolder.setRequestAttributes(attributes); // 执行子线程,这一步不要忘了 runnable.run(); } finally { // 线程结束,清空这些信息,否则可能造成内存泄漏 RequestContextHolder.resetRequestAttributes(); } }; } }
ExecutorConfig
在原来的基础上增加 executor.setTaskDecorator(new CustomTaskDecorator());
@Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { log.info("start asyncServiceExecutor----------------"); //ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //使用可视化运行状态的线程池 ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(corePoolSize); //配置最大线程数 executor.setMaxPoolSize(maxPoolSize); //配置队列大小 executor.setQueueCapacity(queueCapacity); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix(namePrefix); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //增加线程池修饰类 executor.setTaskDecorator(new CustomTaskDecorator()); //增加MDC的线程池修饰类 //executor.setTaskDecorator(new MDCTaskDecorator()); //执行初始化 executor.initialize(); log.info("end asyncServiceExecutor------------"); return executor; }
AsyncServiceImpl
/** * 使用RequestAttributes获取主线程传递的数据 * @return * @throws InterruptedException */ @Async("asyncServiceExecutor") public CompletableFuture<String> executeValueAsync3() throws InterruptedException { log.info("start executeValueAsync"); System.out.println("异步线程执行返回结果......+"); RequestAttributes attributes = RequestContextHolder.getRequestAttributes(); Object userId = attributes.getAttribute("userId", 0); log.info("end executeValueAsync"); return CompletableFuture.completedFuture(userId.toString()); }
Test2Controller
/** * RequestContextHolder+TaskDecorator的方式 * @return * @throws InterruptedException * @throws ExecutionException */ @GetMapping("/test3") public String test3() throws InterruptedException, ExecutionException { RequestAttributes attributes = RequestContextHolder.getRequestAttributes(); attributes.setAttribute("userId","123456",0); CompletableFuture<String> completableFuture = asyncService.executeValueAsync3(); String s = completableFuture.get(); return s; }
4 MDC+TaskDecorator
自定义MDCTaskDecorator
/** * 线程池修饰类 */ public class MDCTaskDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable) { // 获取主线程中的请求信息(我们的用户信息也放在里面) String userId = MDC.get("userId"); Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap(); System.out.println(copyOfContextMap); return () -> { try { // 将主线程的请求信息,设置到子线程中 MDC.put("userId",userId); // 执行子线程,这一步不要忘了 runnable.run(); } finally { // 线程结束,清空这些信息,否则可能造成内存泄漏 MDC.clear(); } }; } }
ExecutorConfig
在原来的基础上增加 executor.setTaskDecorator(new MDCTaskDecorator());
@Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { log.info("start asyncServiceExecutor----------------"); //ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //使用可视化运行状态的线程池 ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(corePoolSize); //配置最大线程数 executor.setMaxPoolSize(maxPoolSize); //配置队列大小 executor.setQueueCapacity(queueCapacity); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix(namePrefix); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //增加MDC的线程池修饰类 executor.setTaskDecorator(new MDCTaskDecorator()); //执行初始化 executor.initialize(); log.info("end asyncServiceExecutor------------"); return executor; }
AsyncServiceImpl
/** * 使用MDC获取主线程传递的数据 * @return * @throws InterruptedException */ @Async("asyncServiceExecutor") public CompletableFuture<String> executeValueAsync5() throws InterruptedException { log.info("start executeValueAsync"); System.out.println("异步线程执行返回结果......+"); log.info("end executeValueAsync"); return CompletableFuture.completedFuture(MDC.get("userId")); }
Test2Controller
/** * 使用MDC+TaskDecorator方式 * 本质也是ThreadLocal+TaskDecorator方式 * @return * @throws InterruptedException * @throws ExecutionException */ @GetMapping("/test5") public String test5() throws InterruptedException, ExecutionException { MDC.put("userId","123456"); CompletableFuture<String> completableFuture = asyncService.executeValueAsync5(); String s = completableFuture.get(); return s; }