java并发编程实践——王宝令(极客时间)学习笔记

简介: java并发编程实践——王宝令(极客时间)学习笔记

1、并发

分工:如何高效地拆解任务并分配给线程

同步:线程之间如何协作

互斥:保证同一时刻只允许一个线程访问共享资源

Fork/Join 框架就是一种分工模式,CountDownLatch 就是一种典型的同步方式,而可重入锁则是一种互斥手段。

2、可见性、原子性、有序性

(1)可见性:缓存导致

(2)原子性:线程切换

count+=1

(3)有序性:编译优化

3、java内存模型

(1)可见性:缓存导致-----按需禁用缓存

(2)有序性:编译优化-----按需禁用

volatile int x=0;(该变量的读写,不使用cpu缓存,直接使用内存读取或者写入)

(3)原子性:同一时刻,只有一个线程执行,互斥。

synchronized

4、死锁

死锁发生的条件:

(1)互斥,共享资源x和y只能被一个线程占用

(2)占有且等待,线程 T1 已经取得共享资源 X,在等待共享资源 Y 的时候,不释放共享资源 X;

破坏占用且等待条件:一次性申请所有资源

(3)不可抢占,其他线程不能强行抢占线程 T1 占有的资源;

破坏不可强占条件

(4)循环等待,线程 T1 等待线程 T2 占有的资源,线程 T2 等待线程 T1 占有的资源,就是循环等待。

破坏循环等待条件:

wait和sleep区别
1:wait释放资源,sleep不释放资源
2:wait需要被唤醒,sleep不需要
3:wait需要获取到监视器,否则抛异常,sleep不需要
4:wait是object顶级父类的方法,sleep则是Thread的方法

5.CountDownLatch和CyclicBarrier:如何让多线程步调一致?(主线程等待子线程结束)

Thread t1 = new Thread(() -> {
            try {
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        t1.start();
        Thread t2 = new Thread(() -> {
            try {
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        t2.start();
        //实现等待
        t1.join();
        t2.join();
        System.out.println("=============");

线程池

Executor executor = Executors.newFixedThreadPool(2);
        CountDownLatch latch = new CountDownLatch(2);
        executor.execute(()->{
            try {
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName());
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        executor.execute(()->{
            try {
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName());
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        latch.await();
        System.out.println("=============");

CountDownLatch 主要用来解决一个线程等待多个线程的场景。(CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。)

CyclicBarrier   ---------- A线程执行,B线程执行,A、B其中一个线程等到AB执行完成再执行(不是主线程,且是异步的)

参考:https://www.cnblogs.com/dolphin0520/p/3920397.html

6.并发容器

List、Map、Set、Queue

非线程安全:ArrayList、HashMap

7.原子类

8.线程池、Executor

ThreadPoolExecutor

线程池实际上是生产者 - 消费者模式

ThreadPoolExecutor(
  int corePoolSize,
  int maximumPoolSize,
  long keepAliveTime,
  TimeUnit unit,
  BlockingQueue<Runnable> workQueue,
  ThreadFactory threadFactory,
  RejectedExecutionHandler handler) 

https://blog.csdn.net/weixin_36586120/article/details/109274771

线程池大小设置:

cpu(计算型)

io型

https://stefan.blog.csdn.net/article/details/109234348

1.当创建的线程数小于核心线程数corePoolSize时,提交任务会继续创建新线程执行任务。

2.当创建的线程数大于等于corePoolSize时,此时再提交任务将被添加到工作队列workQueue中。

3.当工作队列workQueue已满,此时再提交任务会创建新线程,触发第二个阈值的判断maximumPoolSize。

4.当创建的线程数大于等于最大线程数maximumPoolSize时,此时再提交任务将触发拒绝策略RejectedExecutionHandler

9.Future

ExecutorService executor 
  = Executors.newFixedThreadPool(1);
// 创建 Result 对象 r
Result r = new Result();
r.setAAA(a);
// 提交任务
Future<Result> future = 
  executor.submit(new Task(r), r);  
Result fr = future.get();
// 下面等式成立
fr === r;
fr.getAAA() === a;
fr.getXXX() === x
class Task implements Runnable{
  Result r;
  // 通过构造函数传入 result
  Task(Result r){
    this.r = r;
  }
  void run() {
    // 可以操作 result
    a = r.getAAA();
    r.setXXX(x);
  }
}
// 创建 FutureTask
FutureTask<Integer> futureTask
  = new FutureTask<>(()-> 1+2);
// 创建线程池
ExecutorService es = 
  Executors.newCachedThreadPool();
// 提交 FutureTask 
es.submit(futureTask);
// 获取计算结果
Integer result = futureTask.get();
// 创建 FutureTask
FutureTask<Integer> futureTask
  = new FutureTask<>(()-> 1+2);
// 创建并启动线程
Thread T1 = new Thread(futureTask);
T1.start();
// 获取计算结果
Integer result = futureTask.get();

// 创建任务 T2 的 FutureTask
FutureTask<String> ft2
  = new FutureTask<>(new T2Task());
// 创建任务 T1 的 FutureTask
FutureTask<String> ft1
  = new FutureTask<>(new T1Task(ft2));
// 线程 T1 执行任务 ft1
Thread T1 = new Thread(ft1);
T1.start();
// 线程 T2 执行任务 ft2
Thread T2 = new Thread(ft2);
T2.start();
// 等待线程 T1 执行结果
System.out.println(ft1.get());
// T1Task 需要执行的任务:
// 洗水壶、烧开水、泡茶
class T1Task implements Callable<String>{
  FutureTask<String> ft2;
  // T1 任务需要 T2 任务的 FutureTask
  T1Task(FutureTask<String> ft2){
    this.ft2 = ft2;
  }
  @Override
  String call() throws Exception {
    System.out.println("T1: 洗水壶...");
    TimeUnit.SECONDS.sleep(1);
    System.out.println("T1: 烧开水...");
    TimeUnit.SECONDS.sleep(15);
    // 获取 T2 线程的茶叶  
    String tf = ft2.get();
    System.out.println("T1: 拿到茶叶:"+tf);
    System.out.println("T1: 泡茶...");
    return " 上茶:" + tf;
  }
}
// T2Task 需要执行的任务:
// 洗茶壶、洗茶杯、拿茶叶
class T2Task implements Callable<String> {
  @Override
  String call() throws Exception {
    System.out.println("T2: 洗茶壶...");
    TimeUnit.SECONDS.sleep(1);
    System.out.println("T2: 洗茶杯...");
    TimeUnit.SECONDS.sleep(2);
    System.out.println("T2: 拿茶叶...");
    TimeUnit.SECONDS.sleep(1);
    return " 龙井 ";
  }
}
// 一次执行结果:
T1: 洗水壶...
T2: 洗茶壶...
T1: 烧开水...
T2: 洗茶杯...
T2: 拿茶叶...
T1: 拿到茶叶: 龙井
T1: 泡茶...
上茶: 龙井

10.CompletableFuture

// 任务 1:洗水壶 -> 烧开水
CompletableFuture<Void> f1 = 
  CompletableFuture.runAsync(()->{
  System.out.println("T1: 洗水壶...");
  sleep(1, TimeUnit.SECONDS);
  System.out.println("T1: 烧开水...");
  sleep(15, TimeUnit.SECONDS);
});
// 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶
CompletableFuture<String> f2 = 
  CompletableFuture.supplyAsync(()->{
  System.out.println("T2: 洗茶壶...");
  sleep(1, TimeUnit.SECONDS);
  System.out.println("T2: 洗茶杯...");
  sleep(2, TimeUnit.SECONDS);
  System.out.println("T2: 拿茶叶...");
  sleep(1, TimeUnit.SECONDS);
  return " 龙井 ";
});
// 任务 3:任务 1 和任务 2 完成后执行:泡茶
CompletableFuture<String> f3 = 
  f1.thenCombine(f2, (__, tf)->{
    System.out.println("T1: 拿到茶叶:" + tf);
    System.out.println("T1: 泡茶...");
    return " 上茶:" + tf;
  });
// 等待任务 3 执行结果
System.out.println(f3.join());
void sleep(int t, TimeUnit u) {
  try {
    u.sleep(t);
  }catch(InterruptedException e){}
}
// 一次执行结果:
T1: 洗水壶...
T2: 洗茶壶...
T1: 烧开水...
T2: 洗茶杯...
T2: 拿茶叶...
T1: 拿到茶叶: 龙井
T1: 泡茶...
上茶: 龙井

11.CompletionService

参考: http://blog.csdn.net/lmj623565791/article/details/27250059   https://www.cnblogs.com/hrhguanli/p/3998865.html

普通情况下,我们使用Runnable作为主要的任务表示形式,可是Runnable是一种有非常大局限的抽象,run方法中仅仅能记录日志,打印,或者把数据汇总入某个容器(一方面内存消耗大,还有一方面须要控制同步,效率非常大的限制),总之不能返回运行的结果;比方同一时候1000个任务去网络上抓取数据,然后将抓取到的数据进行处理(处理方式不定),我认为最好的方式就是提供回调接口,把处理的方式最为回调传进去;可是如今我们有了更好的方式实现:CompletionService + Callable

Callable的call方法能够返回运行的结果;

CompletionService将Executor(线程池)和BlockingQueue(堵塞队列)结合在一起,同一时候使用Callable作为任务的基本单元,整个过程就是生产者不断把Callable任务放入堵塞对了,Executor作为消费者不断把任务取出来运行,并返回结果;

优势:

a、堵塞队列防止了内存中排队等待的任务过多,造成内存溢出(毕竟一般生产者速度比較快,比方爬虫准备好网址和规则,就去运行了,运行起来(消费者)还是比較慢的)

b、CompletionService能够实现,哪个任务先运行完毕就返回,而不是按顺序返回,这样能够极大的提升效率;

package com.zhy.concurrency.completionService;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
/**
 * 将Executor和BlockingQueue功能融合在一起,能够将Callable的任务提交给它来运行, 然后使用take()方法获得已经完毕的结果
 * 
 * @author zhy
 * 
 */
public class CompletionServiceDemo
{
    public static void main(String[] args) throws InterruptedException,
            ExecutionException
    {
        /**
         * 内部维护11个线程的线程池
         */
        ExecutorService exec = Executors.newFixedThreadPool(11);
        /**
         * 容量为10的堵塞队列
         */
        final BlockingQueue<Future<Integer>> queue = new LinkedBlockingDeque<Future<Integer>>(
                10);
        //实例化CompletionService
        final CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
                exec, queue);
        /**
         * 模拟瞬间产生10个任务,且每一个任务运行时间不一致
         */
        for (int i = 0; i < 10; i++)
        {
            completionService.submit(new Callable<Integer>()
            {
                @Override
                public Integer call() throws Exception
                {
                    int ran = new Random().nextInt(1000);
                    Thread.sleep(ran);
                    System.out.println(Thread.currentThread().getName()
                            + " 歇息了 " + ran);
                    return ran;
                }
            });
        }
        /**
         * 马上输出结果
         */
        for (int i = 0; i < 10; i++)
        {
            try
            {    
                //谁最先运行完毕,直接返回
                Future<Integer> f = completionService.take();
                System.out.println(f.get());
            } catch (InterruptedException e)
            {
                e.printStackTrace();
            } catch (ExecutionException e)
            {
                e.printStackTrace();
            }
        }
        exec.shutdown();
    }
}
输出结果:
pool-1-thread-4 歇息了 52
52
pool-1-thread-1 歇息了 59
59
pool-1-thread-10 歇息了 215
215
pool-1-thread-9 歇息了 352
352
pool-1-thread-5 歇息了 389
389
pool-1-thread-3 歇息了 589
589
pool-1-thread-2 歇息了 794
794
pool-1-thread-7 歇息了 805
805
pool-1-thread-6 歇息了 909
909
pool-1-thread-8 歇息了 987
987

2.ExecutorService.invokeAll

ExecutorService的invokeAll方法也能批量运行任务,并批量返回结果,可是呢,有个我认为非常致命的缺点,必须等待全部的任务运行完毕后统一返回,一方面内存持有的时间长;还有一方面响应性也有一定的影响,毕竟大家都喜欢看看刷刷的运行结果输出,而不是苦苦的等待;

package com.zhy.concurrency.executors;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class TestInvokeAll
{
    public static void main(String[] args) throws InterruptedException,
            ExecutionException
    {
        ExecutorService exec = Executors.newFixedThreadPool(10);
        List<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
        Callable<Integer> task = null;
        for (int i = 0; i < 10; i++)
        {
            task = new Callable<Integer>()
            {
                @Override
                public Integer call() throws Exception
                {
                    int ran = new Random().nextInt(1000);
                    Thread.sleep(ran);
                    System.out.println(Thread.currentThread().getName()+" 歇息了 " + ran );
                    return ran;
                }
            };
            tasks.add(task);
        }
        long s = System.currentTimeMillis();
        List<Future<Integer>> results = exec.invokeAll(tasks);
        System.out.println("运行任务消耗了 :" + (System.currentTimeMillis() - s) +"毫秒");
        for (int i = 0; i < results.size(); i++)
        {
            try
            {
                System.out.println(results.get(i).get());
            } catch (Exception e)
            {
                e.printStackTrace();
            }
        }
        exec.shutdown();
    }
}
运行结果:
pool-1-thread-10 歇息了 1
pool-1-thread-5 歇息了 59
pool-1-thread-6 歇息了 128
pool-1-thread-1 歇息了 146
pool-1-thread-3 歇息了 158
pool-1-thread-7 歇息了 387
pool-1-thread-9 歇息了 486
pool-1-thread-8 歇息了 606
pool-1-thread-4 歇息了 707
pool-1-thread-2 歇息了 817
运行任务消耗了 :819毫秒
146
817
158
707
59
128
387
606
486
1

12.Fork/Join 单机版本的MapReduce

分治任务

static void main(String[] args){
  // 创建分治任务线程池  
  ForkJoinPool fjp = 
    new ForkJoinPool(4);
  // 创建分治任务
  Fibonacci fib = 
    new Fibonacci(30);   
  // 启动分治任务  
  Integer result = 
    fjp.invoke(fib);
  // 输出结果  
  System.out.println(result);
}
// 递归任务
static class Fibonacci extends 
    RecursiveTask<Integer>{
  final int n;
  Fibonacci(int n){this.n = n;}
  protected Integer compute(){
    if (n <= 1)
      return n;
    Fibonacci f1 = 
      new Fibonacci(n - 1);
    // 创建子任务  
    f1.fork();
    Fibonacci f2 = 
      new Fibonacci(n - 2);
    // 等待子任务结果,并合并结果  
    return f2.compute() + f1.join();
  }
}

13.ThreadLocal

https://baijiahao.baidu.com/s?id=1609916413785756020&wfr=spider&for=pc


相关文章
|
1月前
|
安全 Java 程序员
深入理解Java内存模型与并发编程####
本文旨在探讨Java内存模型(JMM)的复杂性及其对并发编程的影响,不同于传统的摘要形式,本文将以一个实际案例为引子,逐步揭示JMM的核心概念,包括原子性、可见性、有序性,以及这些特性在多线程环境下的具体表现。通过对比分析不同并发工具类的应用,如synchronized、volatile关键字、Lock接口及其实现等,本文将展示如何在实践中有效利用JMM来设计高效且安全的并发程序。最后,还将简要介绍Java 8及更高版本中引入的新特性,如StampedLock,以及它们如何进一步优化多线程编程模型。 ####
32 0
|
1月前
|
Java 程序员
Java编程中的异常处理:从基础到高级
在Java的世界中,异常处理是代码健壮性的守护神。本文将带你从异常的基本概念出发,逐步深入到高级用法,探索如何优雅地处理程序中的错误和异常情况。通过实际案例,我们将一起学习如何编写更可靠、更易于维护的Java代码。准备好了吗?让我们一起踏上这段旅程,解锁Java异常处理的秘密!
|
15天前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
Java 并发编程——volatile 关键字解析
|
19天前
|
算法 Java 调度
java并发编程中Monitor里的waitSet和EntryList都是做什么的
在Java并发编程中,Monitor内部包含两个重要队列:等待集(Wait Set)和入口列表(Entry List)。Wait Set用于线程的条件等待和协作,线程调用`wait()`后进入此集合,通过`notify()`或`notifyAll()`唤醒。Entry List则管理锁的竞争,未能获取锁的线程在此排队,等待锁释放后重新竞争。理解两者区别有助于设计高效的多线程程序。 - **Wait Set**:线程调用`wait()`后进入,等待条件满足被唤醒,需重新竞争锁。 - **Entry List**:多个线程竞争锁时,未获锁的线程在此排队,等待锁释放后获取锁继续执行。
54 12
|
15天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
98 2
|
1月前
|
存储 监控 小程序
Java中的线程池优化实践####
本文深入探讨了Java中线程池的工作原理,分析了常见的线程池类型及其适用场景,并通过实际案例展示了如何根据应用需求进行线程池的优化配置。文章首先介绍了线程池的基本概念和核心参数,随后详细阐述了几种常见的线程池实现(如FixedThreadPool、CachedThreadPool、ScheduledThreadPool等)的特点及使用场景。接着,通过一个电商系统订单处理的实际案例,分析了线程池参数设置不当导致的性能问题,并提出了相应的优化策略。最终,总结了线程池优化的最佳实践,旨在帮助开发者更好地利用Java线程池提升应用性能和稳定性。 ####
|
1月前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
1月前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
51 3
|
1月前
|
安全 Java 数据库连接
Java中的异常处理:理解与实践
在Java的世界里,异常处理是维护代码健壮性的守门人。本文将带你深入理解Java的异常机制,通过直观的例子展示如何优雅地处理错误和异常。我们将从基本的try-catch结构出发,探索更复杂的finally块、自定义异常类以及throw关键字的使用。文章旨在通过深入浅出的方式,帮助你构建一个更加稳定和可靠的应用程序。
34 5
|
1月前
|
安全 Java 程序员
Java内存模型的深入理解与实践
本文旨在深入探讨Java内存模型(JMM)的核心概念,包括原子性、可见性和有序性,并通过实例代码分析这些特性在实际编程中的应用。我们将从理论到实践,逐步揭示JMM在多线程编程中的重要性和复杂性,帮助读者构建更加健壮的并发程序。