《JUC并发编程 - 基础篇》 Callable接口 | 辅助类 | 读写锁 | 阻塞队列 | 线程池 | Stream流 | 分支合并框架(一)

简介: 《JUC并发编程 - 基础篇》 Callable接口 | 辅助类 | 读写锁 | 阻塞队列 | 线程池 | Stream流 | 分支合并框架

7、Callable接口


引入: 面试题:获得多线程的方法几种?


(1)继承thread类(2)runnable接口

如果只回答这两个你连被问到juc的机会都没有


(3) 实现Callable接口


7.1 是什么?


4f6d95a8c46f561e0845b26dc3694340.png


这是一个函数式接口,因此可以用作lambda表达式或方法引用的赋值对象。


7.2 与runnable对比


实现方法对比:

//创建新类MyThread实现runnable接口
class MyThread implements Runnable{
     @Override
     public void run() {
     }
}
//新类MyThread2实现callable接口
class MyThread2 implements Callable<Integer>{
     @Override
     public Integer call() throws Exception {
      return 200;
     } 
}

面试题:callable接口与runnable接口的区别?


答:(1)是否有返回值

(2)是否抛异常

(3)落地方法不一样,一个是run,一个是call


7.3 怎么用


直接替换runnable是否可行?


不可行,因为:thread类的构造方法根本没有Callable


8708efd3180d2d210a9da0c408b441e2.png


解决办法:认识不同的人找中间人


这像认识一个不认识的同学,我可以找中间人介绍。

中间人是什么?java多态,一个类可以实现多个接口!!


67a8676f2633b38b907c81dc4394a49a.png


运行成功后如何获得返回值?ft.get();


2049aca7e1dd95c52ccc4aeb1dcf47ce.png


实现代码


class MyThread implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        TimeUnit.SECONDS.sleep(4);
        System.out.println("*********come in here");
        return 1024;
    }
}
/**
 * @author lxy
 * @version 1.0
 * @Description 多线程中,第三种获得多线程的方式
 * @date 2022/4/29 16:55
 */
public class CallableDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask <Integer> futureTask = new FutureTask <>(new MyThread());
        new Thread(futureTask,"A").start();//只被执行一次.
        new Thread(futureTask,"B").start();
        System.out.println(Thread.currentThread().getName());
        System.out.println(futureTask.get());
    }
}

7.4 FutureTask


7.4.1 FutureTask原理


未来的任务,用它就干一件事,异步调用

main方法就像一个冰糖葫芦,一个个方法由main串起来。

但解决不了一个问题:正常调用挂起堵塞问题 , 比如一个方法执行起来很花费时间…我们可以将它单独另启动一个线程A去做,主线程继续往下执行,最终在主线程结束之前将A的结果再进行操作… 可以大大提高效率.


7084e67fc0c888ec2621bcef35e541a5.png


例子:

(1)老师上着课,口渴了,去买水不合适,讲课线程继续,我可以单起个线程找班长帮忙买水,

水买回来了放桌上,我需要的时候再去get。

(2)4个同学,A算1+20,B算21+30,C算31*到40,D算41+50,是不是C的计算量有点大啊,

FutureTask单起个线程给C计算,我先汇总ABD,最后等C计算完了再汇总C,拿到最终结果

(3)高考:会做的先做,不会的放在后面做


7.4.2 原理补充


在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给Future对象在后台完成,当主线程将来需要时,就可以通过Future对象获得后台作业的计算结果或者执行状态。

一般FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。

如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。get方法获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常。

只计算一次(线程只能被创建执行一次),get方法放到最后


8、JUC强大的辅助类讲解


8.1 CountDownLatch(减少计数)

/**
 *
 * CountDownLatch: 让一些线程阻塞直到另一些线程完成一系列操作后才被唤醒。
 *
 * CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。
 * 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞),
 * 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。
 *
 * 解释:6个同学陆续离开教室后值班同学才可以关门。
 *
 * main主线程必须要等前面6个线程完成全部工作后,自己才能开干
 */
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(6);//给计数器赋值为6
        for (int i = 0; i < 6; i++) {//有六个上自习的同学,各自离开教室的时间不一样
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"\t号同学离开教室");
                countDownLatch.countDown();//每调用一次,计数器减一(也就是一个线程退出)
            },String.valueOf(i)).start();
        }
        countDownLatch.await();//阻塞main线程,当计数器减为0,则唤醒main线程.
        System.out.println(Thread.currentThread().getName()+"\t************班长关门走人,main线程是班长");
    }
}

8.2 CyclicBarrier(循环栅栏)

/**
 * CyclicBarrier:字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是,
 * 让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有
 * 被屏障拦截的线程才会继续干活。
 * 线程进入屏障通过CyclicBarrier的await()方法。
 *
 * 例:集齐7颗龙珠就可以召唤神龙
 */
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        //CyclicBarrier(int parties, Runnable barrierAction) 当所有的屏障经历完才干活
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
            System.out.println("**********召唤神龙");
        });
        for (int i = 0; i < 7; i++) {
            final int tempInt = i;
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"\t收集到第"+tempInt+"颗龙珠");
                try {
                    cyclicBarrier.await();//进行等待
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();
        }
    }
}

8.3 Semaphore 信号灯

/**
 *
 * @Description: 信号量
 *
 * 在信号量上我们定义两种操作:
 * acquire(获取) 当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),
 *             要么一直等下去,直到有线程释放信号量,或超时。
 * release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。
 *
 * 信号量主要用于两个目的,一个是用于多个共享资源的互斥使用(资源被用尽,只能等待,直到有多余资源),
 * 另一个用于并发线程数的控制。(1s 100w访问量)
 * 例:抢车位
 */
public class SemaphoreDemo {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);//资源类,有三个空车位
        for (int i = 1; i <= 6; i++) {
            new Thread(()->{
                try {
                    semaphore.acquire();//占车位,车位数-1
                    System.out.println(Thread.currentThread().getName()+"\t抢占到了车位");
                    //暂停一会线程
                    TimeUnit.SECONDS.sleep(3);
                    System.out.println(Thread.currentThread().getName()+"\t离开了车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    semaphore.release();//车位数+1
                }
            },String.valueOf(i)).start();
        }
    }
}

9、ReentrantReadWriteLock

9.1 读写锁介绍

现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。


针对这种场景,JAVA 的并发包提供了读写锁ReentrantReadWriteLock ,它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁。


线程进入读锁的前提条件:


没有其他线程的写锁

没有写请求, 或者有写请求,但调用线程和持有锁的线程是同一个(可重入锁)。

线程进入写锁的前提条件:


没有其他线程的读锁

没有其他线程的写锁

而读写锁有以下三个重要的特性:

(1)公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。

(2)重进入:读锁和写锁都支持线程重进入。

(3)锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。


9.2 ReentrantReadWriteLock


ReentrantReadWriteLock 类的整体结构


public class ReentrantReadWriteLock implements ReadWriteLock,java.io.Serializable {
    /**
     * 读锁
     */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /**
     * 写锁
     */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    final Sync sync;
    /**
     * 使用默认(非公平)的排序属性创建一个新的
     * ReentrantReadWriteLock
     */
    public ReentrantReadWriteLock() {
        this(false);
    }
    /**
     * 使用给定的公平策略创建一个新的 ReentrantReadWriteLock
     */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
    /**
     * 返回用于写入操作的锁
     */
    public ReentrantReadWriteLock.WriteLock writeLock() {
        return writerLock;
    }
    /**
     * 返回用于读取操作的锁
     */
    public ReentrantReadWriteLock.ReadLock readLock() {
        return readerLock;
    }
    abstract static class Sync extends AbstractQueuedSynchronizer {
    }
    static final class NonfairSync extends Sync {
    }
    static final class FairSync extends Sync {
    }
    public static class ReadLock implements Lock, java.io.Serializable {
    }
    public static class WriteLock implements Lock, java.io.Serializable {
    }
}

可以看到,ReentrantReadWriteLock 实现了ReadWriteLock 接口,ReadWriteLock 接口定义了获取读锁和写锁的规范,具体需要实现类去实现;同时其还实现了Serializable 接口,表示可以进行序列化,在源代码中可以看到ReentrantReadWriteLock 实现了自己的序列化逻辑。


9.3 入门案例


场景: 使用ReentrantReadWriteLock 对一个hashmap 进行读和写操作


package com.rg.juc;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class MyCache {
    private volatile Map <String, Object> map = new HashMap <>();
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    public void put(String key, Object value) {
        readWriteLock.writeLock().lock();//上写锁
        try {
            System.out.println(Thread.currentThread().getName() + "\t ----写入数据" + key);
            //网络拥堵(暂停一会线程)
            try {
                TimeUnit.MILLISECONDS.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            map.put(key, value);
            System.out.println(Thread.currentThread().getName() + "\t ----写入完成"+key);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.writeLock().unlock();
        }
    }
    public void get(String key) {
        readWriteLock.readLock().lock();//读锁
        try {
            System.out.println(Thread.currentThread().getName() + "\t ----读取数据" + key);
            //网络拥堵(暂停一会线程)
            try {
                TimeUnit.MILLISECONDS.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Object result = map.get(key);
            System.out.println(Thread.currentThread().getName() + "\t ----读取完成" + result);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.readLock().unlock();
        }
    }
}
/**
 * @author lxy
 * @version 1.0
 * @Description
 * @date 2022/5/1 11:59
 */
public class ReadWriteLockDemo {
    public static void main(String[] args) {
        MyCache myCache = new MyCache();
        //五个读线程
        for (int i = 1; i <= 5; i++) {
            final int tempInt = i;
            new Thread(() -> {
                myCache.put(tempInt + "", tempInt + "");
            }, String.valueOf(i)).start();
        }
        //五个写线程
        for (int i = 1; i <= 5; i++) {
            final int tempInt = i;
            new Thread(() -> {
                myCache.get(tempInt + "");
            }, String.valueOf(i)).start();
        }
    }
}
相关文章
|
16天前
|
Java 开发者
解锁并发编程新姿势!深度揭秘AQS独占锁&ReentrantLock重入锁奥秘,Condition条件变量让你玩转线程协作,秒变并发大神!
【8月更文挑战第4天】AQS是Java并发编程的核心框架,为锁和同步器提供基础结构。ReentrantLock基于AQS实现可重入互斥锁,比`synchronized`更灵活,支持可中断锁获取及超时控制。通过维护计数器实现锁的重入性。Condition接口允许ReentrantLock创建多个条件变量,支持细粒度线程协作,超越了传统`wait`/`notify`机制,助力开发者构建高效可靠的并发应用。
36 0
|
2月前
|
分布式计算 并行计算 安全
在Python Web开发中,Python的全局解释器锁(Global Interpreter Lock,简称GIL)是一个核心概念,它直接影响了Python程序在多线程环境下的执行效率和性能表现
【6月更文挑战第30天】Python的GIL是CPython中的全局锁,限制了多线程并行执行,尤其是在多核CPU上。GIL确保同一时间仅有一个线程执行Python字节码,导致CPU密集型任务时多线程无法充分利用多核,反而可能因上下文切换降低性能。然而,I/O密集型任务仍能受益于线程交替执行。为利用多核,开发者常选择多进程、异步IO或使用不受GIL限制的Python实现。在Web开发中,理解GIL对于优化并发性能至关重要。
46 0
|
5天前
|
算法 Java
JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
该博客文章综合介绍了Java并发编程的基础知识,包括线程与进程的区别、并发与并行的概念、线程的生命周期状态、`sleep`与`wait`方法的差异、`Lock`接口及其实现类与`synchronized`关键字的对比,以及生产者和消费者问题的解决方案和使用`Condition`对象替代`synchronized`关键字的方法。
JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
|
26天前
|
算法 Java 编译器
多线程线程安全问题之系统层面的锁优化有哪些常见的策略
多线程线程安全问题之系统层面的锁优化有哪些常见的策略
|
26天前
|
安全 云计算
云计算自旋锁问题之在线程安全地删除链表节点时,需要频繁加锁会影响性能如何解决
云计算自旋锁问题之在线程安全地删除链表节点时,需要频繁加锁会影响性能如何解决
29 2
|
5天前
|
缓存 Java
JUC(4)Callable和常用的辅助类
这篇文章介绍了Java并发工具包中的`Callable`接口及其与`Runnable`的区别,以及几个常用的并发辅助类,如`CountDownLatch`、`CyclicBarrier`和`Semaphore`,并提供了它们使用方式的示例代码。
|
26天前
|
Java
多线程线程安全问题之什么是锁的粒度,减少锁的粒度有哪些好处
多线程线程安全问题之什么是锁的粒度,减少锁的粒度有哪些好处
|
26天前
多线程线程安全问题之synchronized和ReentrantLock在锁的释放上有何不同
多线程线程安全问题之synchronized和ReentrantLock在锁的释放上有何不同
|
1月前
|
安全 算法 Java
Java 中的并发控制:锁与线程安全
在 Java 的并发编程领域,理解并正确使用锁机制是实现线程安全的关键。本文深入探讨了 Java 中各种锁的概念、用途以及它们如何帮助开发者管理并发状态。从内置的同步关键字到显式的 Lock 接口,再到原子变量和并发集合,本文旨在为读者提供一个全面的锁和线程安全的知识框架。通过具体示例和最佳实践,我们展示了如何在多线程环境中保持数据的一致性和完整性,同时避免常见的并发问题,如死锁和竞态条件。无论你是 Java 并发编程的新手还是有经验的开发者,这篇文章都将帮助你更好地理解和应用 Java 的并发控制机制。
|
20天前
|
消息中间件 算法 Java
(十四)深入并发之线程、进程、纤程、协程、管程与死锁、活锁、锁饥饿详解
本文深入探讨了并发编程的关键概念和技术挑战。首先介绍了进程、线程、纤程、协程、管程等概念,强调了这些概念是如何随多核时代的到来而演变的,以满足高性能计算的需求。随后,文章详细解释了死锁、活锁与锁饥饿等问题,通过生动的例子帮助理解这些现象,并提供了预防和解决这些问题的方法。最后,通过一个具体的死锁示例代码展示了如何在实践中遇到并发问题,并提供了几种常用的工具和技术来诊断和解决这些问题。本文旨在为并发编程的实践者提供一个全面的理解框架,帮助他们在开发过程中更好地处理并发问题。