目录
1. 首先定义一个数据的上下文的class文件
import java.util.Map;
public class DataCaptureContext {
private static final ThreadLocal<Map<String, Object>> threadLocalDataCapture = new ThreadLocal();
public static final String CURRENT_TIME = "current_time";
public DataCaptureContext() {
}
public static Map getMapData() {
Map mapData = null;
Object obj = threadLocalDataCapture.get();
if (obj != null) {
mapData = (Map)obj;
}
return mapData;
}
public static void setMapData(Map<String, Object> mapData) {
if (mapData != null) {
threadLocalDataCapture.set(mapData);
}
}
public static void removeMapData() {
threadLocalDataCapture.remove();
}
}
2.定义一个springboot线程池的全局方法:
2
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskDecorator;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class SpringBootAsyncConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(SpringBootMvcConfig.class);
@Value("${spring.async.thread.pool.core-pool-size}")
private int corePoolSize = 10;
@Value("${spring.async.thread.pool.max-pool-size}")
private int maxPoolSize = 1000;
@Value("${spring.async.thread.pool.queue-capacity}")
private int queueCapacity = 1000;
@Value("${spring.async.thread.pool.keep-alive-seconds}")
private int keepAliveSeconds = 600;
public SpringBootAsyncConfig() {
}
@Bean({"asyncThreadPool"})
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(this.corePoolSize);
executor.setMaxPoolSize(this.maxPoolSize);
executor.setQueueCapacity(this.queueCapacity);
executor.setKeepAliveSeconds(this.keepAliveSeconds);
executor.setThreadNamePrefix("AsyncThreadPool-");
executor.setRejectedExecutionHandler(new AbortPolicy());
executor.setTaskDecorator(new TaskDecorator() {
public Runnable decorate(Runnable runnable) {
Map<String, Object> captureMapData = DataCaptureContext.getMapData();
Map<String, String> mdcMap = MDC.getCopyOfContextMap();
return () -> {
try {
if (mdcMap != null) {
MDC.setContextMap(mdcMap);
}
if (captureMapData != null) {
DataCaptureContext.setMapData(captureMapData);
}
runnable.run();
} finally {
MDC.clear();
DataCaptureContext.removeMapData();
}
};
}
});
executor.initialize();
return executor;
}
@Bean({"springSessionRedisTaskExecutor"})
public ThreadPoolTaskExecutor springSessionRedisTaskExecutor() {
ThreadPoolTaskExecutor springSessionRedisTaskExecutor = new ThreadPoolTaskExecutor();
springSessionRedisTaskExecutor.setCorePoolSize(10);
springSessionRedisTaskExecutor.setMaxPoolSize(10);
springSessionRedisTaskExecutor.setKeepAliveSeconds(600);
springSessionRedisTaskExecutor.setQueueCapacity(1000);
springSessionRedisTaskExecutor.setThreadNamePrefix("Spring session redis executor thread: ");
springSessionRedisTaskExecutor.setRejectedExecutionHandler(new DiscardPolicy());
springSessionRedisTaskExecutor.initialize();
return springSessionRedisTaskExecutor;
}
@Bean({"bankInstructionTaskExecutor"})
public ThreadPoolTaskExecutor bankInstructionTaskExecutor() {
ThreadPoolTaskExecutor bankInstructionTaskExecutor = new ThreadPoolTaskExecutor();
bankInstructionTaskExecutor.setCorePoolSize(20);
bankInstructionTaskExecutor.setMaxPoolSize(20);
bankInstructionTaskExecutor.setKeepAliveSeconds(1800);
bankInstructionTaskExecutor.setQueueCapacity(20);
bankInstructionTaskExecutor.setThreadNamePrefix("bank instruction task executor thread: ");
bankInstructionTaskExecutor.setRejectedExecutionHandler(new DiscardPolicy());
bankInstructionTaskExecutor.setTaskDecorator(new TaskDecorator() {
public Runnable decorate(Runnable runnable) {
Map<String, Object> captureMapData = DataCaptureContext.getMapData();
Map<String, String> mdcMap = MDC.getCopyOfContextMap();
return () -> {
try {
if (mdcMap != null) {
MDC.setContextMap(mdcMap);
}
if (captureMapData != null) {
DataCaptureContext.setMapData(captureMapData);
}
runnable.run();
} finally {
MDC.clear();
DataCaptureContext.removeMapData();
}
};
}
});
bankInstructionTaskExecutor.initialize();
return bankInstructionTaskExecutor;
}
@Bean({"instructionStatusUpdateTaskExecutor"})
public ThreadPoolTaskExecutor instructionStatusUpdateTaskExecutor() {
ThreadPoolTaskExecutor bankInstructionTaskExecutor = new ThreadPoolTaskExecutor();
bankInstructionTaskExecutor.setCorePoolSize(20);
bankInstructionTaskExecutor.setMaxPoolSize(20);
bankInstructionTaskExecutor.setKeepAliveSeconds(1800);
bankInstructionTaskExecutor.setQueueCapacity(20);
bankInstructionTaskExecutor.setThreadNamePrefix("instruction status update executor thread: ");
bankInstructionTaskExecutor.setRejectedExecutionHandler(new DiscardPolicy());
bankInstructionTaskExecutor.setTaskDecorator(new TaskDecorator() {
public Runnable decorate(Runnable runnable) {
Map<String, Object> captureMapData = DataCaptureContext.getMapData();
Map<String, String> mdcMap = MDC.getCopyOfContextMap();
return () -> {
try {
if (mdcMap != null) {
MDC.setContextMap(mdcMap);
}
if (captureMapData != null) {
DataCaptureContext.setMapData(captureMapData);
}
runnable.run();
} finally {
MDC.clear();
DataCaptureContext.removeMapData();
}
};
}
});
bankInstructionTaskExecutor.initialize();
return bankInstructionTaskExecutor;
}
3.关于一些常量的定义,直接在config里面配置即可
编辑