《JUC并发编程 - 基础篇》 Callable接口 | 辅助类 | 读写锁 | 阻塞队列 | 线程池 | Stream流 | 分支合并框架(一)

简介: 《JUC并发编程 - 基础篇》 Callable接口 | 辅助类 | 读写锁 | 阻塞队列 | 线程池 | Stream流 | 分支合并框架

7、Callable接口


引入: 面试题:获得多线程的方法几种?


(1)继承thread类(2)runnable接口

如果只回答这两个你连被问到juc的机会都没有


(3) 实现Callable接口


7.1 是什么?


4f6d95a8c46f561e0845b26dc3694340.png


这是一个函数式接口,因此可以用作lambda表达式或方法引用的赋值对象。


7.2 与runnable对比


实现方法对比:

//创建新类MyThread实现runnable接口
class MyThread implements Runnable{
     @Override
     public void run() {
     }
}
//新类MyThread2实现callable接口
class MyThread2 implements Callable<Integer>{
     @Override
     public Integer call() throws Exception {
      return 200;
     } 
}

面试题:callable接口与runnable接口的区别?


答:(1)是否有返回值

(2)是否抛异常

(3)落地方法不一样,一个是run,一个是call


7.3 怎么用


直接替换runnable是否可行?


不可行,因为:thread类的构造方法根本没有Callable


8708efd3180d2d210a9da0c408b441e2.png


解决办法:认识不同的人找中间人


这像认识一个不认识的同学,我可以找中间人介绍。

中间人是什么?java多态,一个类可以实现多个接口!!


67a8676f2633b38b907c81dc4394a49a.png


运行成功后如何获得返回值?ft.get();


2049aca7e1dd95c52ccc4aeb1dcf47ce.png


实现代码


class MyThread implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        TimeUnit.SECONDS.sleep(4);
        System.out.println("*********come in here");
        return 1024;
    }
}
/**
 * @author lxy
 * @version 1.0
 * @Description 多线程中,第三种获得多线程的方式
 * @date 2022/4/29 16:55
 */
public class CallableDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask <Integer> futureTask = new FutureTask <>(new MyThread());
        new Thread(futureTask,"A").start();//只被执行一次.
        new Thread(futureTask,"B").start();
        System.out.println(Thread.currentThread().getName());
        System.out.println(futureTask.get());
    }
}

7.4 FutureTask


7.4.1 FutureTask原理


未来的任务,用它就干一件事,异步调用

main方法就像一个冰糖葫芦,一个个方法由main串起来。

但解决不了一个问题:正常调用挂起堵塞问题 , 比如一个方法执行起来很花费时间…我们可以将它单独另启动一个线程A去做,主线程继续往下执行,最终在主线程结束之前将A的结果再进行操作… 可以大大提高效率.


7084e67fc0c888ec2621bcef35e541a5.png


例子:

(1)老师上着课,口渴了,去买水不合适,讲课线程继续,我可以单起个线程找班长帮忙买水,

水买回来了放桌上,我需要的时候再去get。

(2)4个同学,A算1+20,B算21+30,C算31*到40,D算41+50,是不是C的计算量有点大啊,

FutureTask单起个线程给C计算,我先汇总ABD,最后等C计算完了再汇总C,拿到最终结果

(3)高考:会做的先做,不会的放在后面做


7.4.2 原理补充


在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给Future对象在后台完成,当主线程将来需要时,就可以通过Future对象获得后台作业的计算结果或者执行状态。

一般FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。

如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。get方法获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常。

只计算一次(线程只能被创建执行一次),get方法放到最后


8、JUC强大的辅助类讲解


8.1 CountDownLatch(减少计数)

/**
 *
 * CountDownLatch: 让一些线程阻塞直到另一些线程完成一系列操作后才被唤醒。
 *
 * CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。
 * 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞),
 * 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。
 *
 * 解释:6个同学陆续离开教室后值班同学才可以关门。
 *
 * main主线程必须要等前面6个线程完成全部工作后,自己才能开干
 */
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(6);//给计数器赋值为6
        for (int i = 0; i < 6; i++) {//有六个上自习的同学,各自离开教室的时间不一样
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"\t号同学离开教室");
                countDownLatch.countDown();//每调用一次,计数器减一(也就是一个线程退出)
            },String.valueOf(i)).start();
        }
        countDownLatch.await();//阻塞main线程,当计数器减为0,则唤醒main线程.
        System.out.println(Thread.currentThread().getName()+"\t************班长关门走人,main线程是班长");
    }
}

8.2 CyclicBarrier(循环栅栏)

/**
 * CyclicBarrier:字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是,
 * 让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有
 * 被屏障拦截的线程才会继续干活。
 * 线程进入屏障通过CyclicBarrier的await()方法。
 *
 * 例:集齐7颗龙珠就可以召唤神龙
 */
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        //CyclicBarrier(int parties, Runnable barrierAction) 当所有的屏障经历完才干活
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
            System.out.println("**********召唤神龙");
        });
        for (int i = 0; i < 7; i++) {
            final int tempInt = i;
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"\t收集到第"+tempInt+"颗龙珠");
                try {
                    cyclicBarrier.await();//进行等待
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();
        }
    }
}

8.3 Semaphore 信号灯

/**
 *
 * @Description: 信号量
 *
 * 在信号量上我们定义两种操作:
 * acquire(获取) 当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),
 *             要么一直等下去,直到有线程释放信号量,或超时。
 * release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。
 *
 * 信号量主要用于两个目的,一个是用于多个共享资源的互斥使用(资源被用尽,只能等待,直到有多余资源),
 * 另一个用于并发线程数的控制。(1s 100w访问量)
 * 例:抢车位
 */
public class SemaphoreDemo {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);//资源类,有三个空车位
        for (int i = 1; i <= 6; i++) {
            new Thread(()->{
                try {
                    semaphore.acquire();//占车位,车位数-1
                    System.out.println(Thread.currentThread().getName()+"\t抢占到了车位");
                    //暂停一会线程
                    TimeUnit.SECONDS.sleep(3);
                    System.out.println(Thread.currentThread().getName()+"\t离开了车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    semaphore.release();//车位数+1
                }
            },String.valueOf(i)).start();
        }
    }
}

9、ReentrantReadWriteLock

9.1 读写锁介绍

现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。


针对这种场景,JAVA 的并发包提供了读写锁ReentrantReadWriteLock ,它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁。


线程进入读锁的前提条件:


没有其他线程的写锁

没有写请求, 或者有写请求,但调用线程和持有锁的线程是同一个(可重入锁)。

线程进入写锁的前提条件:


没有其他线程的读锁

没有其他线程的写锁

而读写锁有以下三个重要的特性:

(1)公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。

(2)重进入:读锁和写锁都支持线程重进入。

(3)锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。


9.2 ReentrantReadWriteLock


ReentrantReadWriteLock 类的整体结构


public class ReentrantReadWriteLock implements ReadWriteLock,java.io.Serializable {
    /**
     * 读锁
     */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /**
     * 写锁
     */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    final Sync sync;
    /**
     * 使用默认(非公平)的排序属性创建一个新的
     * ReentrantReadWriteLock
     */
    public ReentrantReadWriteLock() {
        this(false);
    }
    /**
     * 使用给定的公平策略创建一个新的 ReentrantReadWriteLock
     */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
    /**
     * 返回用于写入操作的锁
     */
    public ReentrantReadWriteLock.WriteLock writeLock() {
        return writerLock;
    }
    /**
     * 返回用于读取操作的锁
     */
    public ReentrantReadWriteLock.ReadLock readLock() {
        return readerLock;
    }
    abstract static class Sync extends AbstractQueuedSynchronizer {
    }
    static final class NonfairSync extends Sync {
    }
    static final class FairSync extends Sync {
    }
    public static class ReadLock implements Lock, java.io.Serializable {
    }
    public static class WriteLock implements Lock, java.io.Serializable {
    }
}

可以看到,ReentrantReadWriteLock 实现了ReadWriteLock 接口,ReadWriteLock 接口定义了获取读锁和写锁的规范,具体需要实现类去实现;同时其还实现了Serializable 接口,表示可以进行序列化,在源代码中可以看到ReentrantReadWriteLock 实现了自己的序列化逻辑。


9.3 入门案例


场景: 使用ReentrantReadWriteLock 对一个hashmap 进行读和写操作


package com.rg.juc;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class MyCache {
    private volatile Map <String, Object> map = new HashMap <>();
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    public void put(String key, Object value) {
        readWriteLock.writeLock().lock();//上写锁
        try {
            System.out.println(Thread.currentThread().getName() + "\t ----写入数据" + key);
            //网络拥堵(暂停一会线程)
            try {
                TimeUnit.MILLISECONDS.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            map.put(key, value);
            System.out.println(Thread.currentThread().getName() + "\t ----写入完成"+key);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.writeLock().unlock();
        }
    }
    public void get(String key) {
        readWriteLock.readLock().lock();//读锁
        try {
            System.out.println(Thread.currentThread().getName() + "\t ----读取数据" + key);
            //网络拥堵(暂停一会线程)
            try {
                TimeUnit.MILLISECONDS.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Object result = map.get(key);
            System.out.println(Thread.currentThread().getName() + "\t ----读取完成" + result);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.readLock().unlock();
        }
    }
}
/**
 * @author lxy
 * @version 1.0
 * @Description
 * @date 2022/5/1 11:59
 */
public class ReadWriteLockDemo {
    public static void main(String[] args) {
        MyCache myCache = new MyCache();
        //五个读线程
        for (int i = 1; i <= 5; i++) {
            final int tempInt = i;
            new Thread(() -> {
                myCache.put(tempInt + "", tempInt + "");
            }, String.valueOf(i)).start();
        }
        //五个写线程
        for (int i = 1; i <= 5; i++) {
            final int tempInt = i;
            new Thread(() -> {
                myCache.get(tempInt + "");
            }, String.valueOf(i)).start();
        }
    }
}
相关文章
|
30天前
|
安全 Java API
java如何请求接口然后终止某个线程
通过本文的介绍,您应该能够理解如何在Java中请求接口并根据返回结果终止某个线程。合理使用标志位或 `interrupt`方法可以确保线程的安全终止,而处理好网络请求中的各种异常情况,可以提高程序的稳定性和可靠性。
48 6
|
20天前
|
Java
【JavaEE】——多线程常用类
Callable的call方法,FutureTask类,ReentrantLock可重入锁和对比,Semaphore信号量(PV操作)CountDownLatch锁存器,
|
20天前
|
Java 程序员 调度
【JavaEE】线程创建和终止,Thread类方法,变量捕获(7000字长文)
创建线程的五种方式,Thread常见方法(守护进程.setDaemon() ,isAlive),start和run方法的区别,如何提前终止一个线程,标志位,isinterrupted,变量捕获
|
20天前
|
安全 Java API
【JavaEE】多线程编程引入——认识Thread类
Thread类,Thread中的run方法,在编程中怎么调度多线程
|
2月前
|
安全 Java
Java多线程集合类
本文介绍了Java中线程安全的问题及解决方案。通过示例代码展示了使用`CopyOnWriteArrayList`、`CopyOnWriteArraySet`和`ConcurrentHashMap`来解决多线程环境下集合操作的线程安全问题。这些类通过不同的机制确保了线程安全,提高了并发性能。
|
2月前
|
Java
java线程接口
Thread的构造方法创建对象的时候传入了Runnable接口的对象 ,Runnable接口对象重写run方法相当于指定线程任务,创建线程的时候绑定了该线程对象要干的任务。 Runnable的对象称之为:线程任务对象 不是线程对象 必须要交给Thread线程对象。 通过Thread的构造方法, 就可以把任务对象Runnable,绑定到Thread对象中, 将来执行start方法,就会自动执行Runable实现类对象中的run里面的内容。
48 1
|
2月前
|
Java 开发者
在Java多线程编程的世界里,Lock接口正逐渐成为高手们的首选,取代了传统的synchronized关键字
在Java多线程编程的世界里,Lock接口正逐渐成为高手们的首选,取代了传统的synchronized关键字
52 4
|
2月前
|
安全 Java
在 Java 中使用实现 Runnable 接口的方式创建线程
【10月更文挑战第22天】通过以上内容的介绍,相信你已经对在 Java 中如何使用实现 Runnable 接口的方式创建线程有了更深入的了解。在实际应用中,需要根据具体的需求和场景,合理选择线程创建方式,并注意线程安全、同步、通信等相关问题,以确保程序的正确性和稳定性。
156 11
|
3月前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
47 3
|
3月前
|
Java
在Java多线程编程中,实现Runnable接口通常优于继承Thread类
【10月更文挑战第20天】在Java多线程编程中,实现Runnable接口通常优于继承Thread类。原因包括:1) Java只支持单继承,实现接口不受此限制;2) Runnable接口便于代码复用和线程池管理;3) 分离任务与线程,提高灵活性。因此,实现Runnable接口是更佳选择。
73 2