守护线程
public class DaemonThreadTest {
public static void main(String[] args) throws InterruptedException {
Runtime.getRuntime().addShutdownHook(new Thread(() -> System.out.println("jvm exit success!! ")));
Thread testThread = new Thread(() -> {
while (true) {
try {
Thread.sleep(5000);
System.out.println("thread still running ....");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
testThread.setDaemon(true);
testThread.start();
}
}
1.使用arthas查看线上的程序线程情况 deamon为守护线程
- redis分布式锁续命用守护线程实现
public Boolean tryLock(String key, String value, long expireTime) {
try {
//自旋上限
int waitCount = timeout;
while (waitCount > 0) {
//SET命令返回OK ,则证明获取锁成功
Boolean setIfAbsent = redisTemplate.opsForValue().setIfAbsent(key, value, expireTime, TimeUnit.SECONDS);
if (setIfAbsent) {
//续命
Thread demo = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
Boolean expire = redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);
//有可能已经主动删除key,不需要在续命
if(!expire){
return;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
demo.setDaemon(true);
demo.start();
return true;
}
//否则循环等待,在timeout时间内仍未获取到锁,则获取失败
Thread.sleep(3000);
waitCount--;
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
}
//没设置到锁,即表示超时了
return false;
}
redission使用的就是这种形式,比如上锁10s,10s后就解锁了,守护线程会一直给锁续命,当主线程退出的时候,守护线程也会跟着退出。
- 那OOM时会不会调用钩子方法?
OOM后钩子方法仍然生效,spring的销毁方法也是生效,分布式锁也可使用,nocas服务取消的时候也在使用
线程优先级
“优先级”这个参数通常并不是那么地“靠谱”,理论上说线程的优先级越高,分配到时间片的几率也就越高,但是在实际运行过程中却并非如此,优先级只能作为一个参考数值,而且具体的线程优先级还和操作系统有关
终止线程
- interrupt方法
static class TestThread implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.print(i+" ");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class TestThreadWithSync implements Runnable {
@Override
public void run() {
synchronized (this) {
for (int i = 20; i < 30; i++) {
System.out.print(i+" ");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
static class TestThreadWithLock implements Runnable {
ReentrantLock reentrantLock = new ReentrantLock();
@Override
public void run() {
reentrantLock.lock();
try {
for (int i = 30; i < 40; i++) {
System.out.print(i+" ");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
reentrantLock.unlock();
}
}
}
/**
* 增加判断标识 如果标记中断就暂停执行
*/
static class TestInterruptedStop implements Runnable {
@Override
public void run() {
System.out.println("开始执行");
synchronized (this) {
//如果当前线程被中断,这里需要主动退出
while (!Thread.currentThread().isInterrupted()) {
}
System.out.println("end");
}
}
}
public static void main(String[] args) throws InterruptedException {
// Thread testThread = new Thread(new TestThread());
// testThread.start();
Thread testThreadWithSync = new Thread(new TestThreadWithSync());
testThreadWithSync.start();
// Thread testThreadWithLock = new Thread(new TestThreadWithLock());
// testThreadWithLock.start();
// Thread forEverThread = new Thread(new ForEverThread());
// forEverThread.start();
// Thread testInterruptedStop = new Thread(new TestInterruptedStop());
Thread.sleep(2000);
// testInterruptedStop.interrupt();
// 如果线程正常执行 那么调用这个函数是无效的
// forEverThread.interrupt();
// testThread.interrupt();
testThreadWithSync.interrupt();
// testThreadWithLock.interrupt();
}
- shutdown与shutDownNow
网上说的shutDownNow会终止线程池中正在执行的线程,实际操作并不是,其实区别是shutDownNow将队列中没有执行的任务放入到一个 List 集合中,并且返回给调用线程。
锁
- 锁升级
当加锁后一个线程访问时,会进入偏向锁状态,当多个线程访问会进入轻量级锁,当多个竞争的线程抢夺该 monitor 的时候,会采用 CAS 的方式,当抢夺次数超过 10 次,或者当前 CPU 资源占用大于 50% 的时候,该锁就会从轻量级锁的状态上升为了重量级锁。
synchronized与lock
- 支持获取锁超时机制;
public void tryLockMethod_2() { try { if (reentrantLock.tryLock(1, TimeUnit.SECONDS)) { try { i++; } catch (Exception e) { e.printStackTrace(); } finally { reentrantLock.unlock(); } } else { //todo } } catch (InterruptedException e) { e.printStackTrace(); } }
- 支持非阻塞方式获取锁;
- 支持可中断方式获取锁。
线程池
1 spring 内部的异步注解
@Service
public class AsyncService {
//加入注解之后,该方法自动就带有了异步的效果
@Async
public void testAsync(){
try {
Thread.sleep(1000*2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" async test");
}
}
直接使用会有一个问题,会无限创建线程,所以要增加配置
@Configuration
public class AsyncExecuteConfig extends AsyncConfigurerSupport {
@Bean
public ThreadPoolTaskExecutor asyncExecutor() {
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
threadPool.setCorePoolSize(3);
threadPool.setMaxPoolSize(3);
threadPool.setWaitForTasksToCompleteOnShutdown(true);
threadPool.setAwaitTerminationSeconds(60 * 15);
return threadPool;
}
@Override
public Executor getAsyncExecutor() {
return asyncExecutor();
}
}
@Async会失效吗?
其实他也是通过代理来实现的,如果同一个类中使用@Async也会失效
线程本地变量
ThreadLocal 对象中提供了线程局部变量,它使得每个线程之间互不干扰,一般可以通过重写它的 initialValue 方法机械能赋值。当线程第一次访问它的时候,就会直接触发 initialValue 方法。
是典型的以空间换时间的处理
- 原理
public void set(T value) {
//获取当前请求的线程
Thread t = Thread.currentThread();
//取出Thread类内部的threadLocals变量,这个变量是一个哈希表结构
ThreadLocalMap map = getMap(t);
if (map != null)
//将需要存储的值放入到这个哈希表中
map.set(this, value);
else
createMap(t, value);
}
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
Java中当一个对象仅被一个弱引用引用时,如果GC运行, 那么这个对象就会被回收。
弱引用的一个特点是它何时被回收是不可确定的;
思考了一下,如果key是弱引用,那么被回收先不说内存泄漏问题,数据本身就回丢失呀,所以操作了一下
public class Demo {
static ThreadLocal<OOMObject> local = new ThreadLocal<>();
public static void main(String[] args) {
local.set(new OOMObject("千云"));
System.gc();
OOMObject oomObject = local.get();
System.out.println(oomObject);
}
}
赋值GC后仍然可以得到结果,就很奇怪,弱引用并没有被回收,还要进一步的思考
- 应用
获取路径内方法的执行时长
@Configuration
public class TimeCountInterceptor implements HandlerInterceptor
{
static class CommonThreadLocal<Long> extends ThreadLocal{
@Override
protected Object initialValue() {
return -1;
}
}
private static ThreadLocal<Long> timeCount = new CommonThreadLocal<>();
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
System.out.println("提前赋值的获取:"+ timeCount.get());
//中间写逻辑代码,比如判断是否登录成功,失败则返回false
timeCount.set(System.currentTimeMillis());
return true;
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) {
long currentTime = System.currentTimeMillis();
long startTime = timeCount.get();
long timeUse = currentTime - startTime;
System.out.println(Thread.currentThread().getName() + "耗时:" + timeUse + "ms");
timeCount.remove();
}
}
多线程优化查询速度
CompletableFuture和Future都可以进行优化查询,java8的parallelStream很实用
public List<UserInfoDTO> batchQueryWithParallelV1(List<Long> userIdList) {
List<UserInfoDTO> resultList = new ArrayList<>();
//并发调用
userIdList.parallelStream().forEach(userId -> {
UserInfoDTO userInfoDTO = userQueryService.queryUserInfoWrapper(userId);
resultList.add(userInfoDTO);
});
return resultList;
}
线程限流
单机版限流,使用Semaphore来实现,如果超过定义的数量那么就丢弃,如果是分布式的服务部署,这种形式就不ok了,要采用redis的形式了
/**
* @Description:单机版限流
* @author: yjw
* @date: 2021/12/20
*/
@Slf4j
@RestController
public class SimpleLimitController {
private Semaphore semaphore = new Semaphore(2);
@GetMapping("do-test-limit")
public void doTest() {
boolean status = false;
try {
//限制流量速度
status = semaphore.tryAcquire();
if (status) {
this.doSomeBiz();
}
} catch (Exception e) {
log.error("[doTest] error is ", e);
} finally {
if (status) {
semaphore.release();
}
}
}
/**
* 执行业务逻辑
*/
private void doSomeBiz() throws InterruptedException {
System.out.println(Thread.currentThread().getName());
Thread.sleep(20000);
}
}