Java 并发包原子操作类解析

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: Java 并发包原子操作类解析

Java 并发包原子操作类解析


前言

JUC 包中提供了一些列原子操作类,这些类都是使用非阻塞算法CAS实现的,相比使用锁实现原子性操作在性能上有较大提高。


由于原子性操作的原理都大致相同,本文只讲解简单的 AtomicLong 类的原理以及在JDK8中新增的 LongAdder 类原理。


原子变量操作类

JUC 并发包中包含 AtomicInteger、AtomicLong 和 AtomicBoolean 等原子性操作类,原理大致类似,接下来我们看一下 AtomicLong 类。


AtomicLong 是原子性递增或者递减类,内部使用Unsafe来实现,我们看下面的代码。

public class AtomicLong extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 1927816293512124184L;
    //1. 获取Unsafe实例
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    //2. 存放变量value的偏移量
    private static final long valueOffset;
    //3. 判断JVM是否支持Long类型无锁CAS
    static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();
    private static native boolean VMSupportsCS8();
    static {
        try {
            //4. 获取value在AtomicLong中的偏移量
            valueOffset = unsafe.objectFieldOffset
                (AtomicLong.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    //5. 实际变量值
    private volatile long value;
    public AtomicLong(long initialValue) {
        value = initialValue;
    }
   ......
}

首先通过Unsafe.getUnsafe()方法获取到 Unsafe 类的实例,


为什么可以获取到 Unsafe 类的实例?因为 AtomicLong 类也在 rt.jar 包下,所以可以通过 BootStrap 类加载器进行加载。


第二步、第四步获取 value 变量在 AtomicLong 类中的偏移量。


第五步的 value 变量被声明为了volatile,这是为了在多线程下保证内存可见性,而 value 存储的就是具体计数器的值。


递增和递减操作代码

接下来我们看一下 AtomicLong 中的主要函数。

//调用unsafe方法,原子性设置value值为原始值+1,返回递增后的值
public final long incrementAndGet() {
        return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}
//调用unsafe方法,原子性设置value值为原始值-1,返回值递减后的值
public final long decrementAndGet() {
        return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L;
}
//调用unsafe方法,原子性设置value值为原始值+1,返回原始值
public final long getAndIncrement() {
        return unsafe.getAndAddLong(this, valueOffset, 1L);
}
//调用unsafe方法,原子性设置value值为原始值-1,返回原始值
public final long getAndDecrement() {
        return unsafe.getAndAddLong(this, valueOffset, -1L);
}

上述代码都是通过调用 Unsafe 的getAndAddLong()方法来实现操作,这个函数是一个原子性操作,第一个参数为 AtomicLong 实例的引用,第二个参数是 value 变量在 AtomicLong 中的偏移值,第三个参数是要设置的第二个变量的值。


其中,getAndIncrement()方法在JDK1.7中的实现逻辑如下。

public final long getAndIncrement() {
   while (true) {
        long current = get();
        long next = current + 1;
        if (compareAndSet(current,next))
            return current;
    }
}

这段代码中,每个线程都是拿到变量的当前值(因为 value 是 volatile 变量,所以拿到的都是最新的值),然后在工作内存中进行增加 1 操作,之后使用CAS修改变量的值。如果设置失败,则一直循环尝试,直到设置成功。


JDK8中的逻辑为:

public final long getAndAddLong(Object var1, long var2, long var4) {
        long var6;
        do {
            var6 = this.getLongVolatile(var1, var2);
        } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));
        return var6;
}

可以看到,JDK1.7的 AtomicLong 中的循环逻辑已经被JDK8中的原子操作类 Unsafe 内置了,之所以内置应该是考虑到这个函数在其他地方也会用到,而内置可以提高复用性


compareAndSet(long expect, long update)方法

public final boolean compareAndSet(long expect, long update) {
        return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}

如上代码我们可以知道,在内部还是调用了unsafe.compareAndSwapLong方法。如果原子变量中的 value 值等于 expect,则使用 update 值更新该值并返回 true,否则返回 false。


下面我们通过一个多线程使用 AtomicLong 统计 0 的个数的例子来加深理解。

/**
 * @author 神秘杰克
 * 公众号: Java菜鸟程序员
 * @date 2022/1/4
 * @Description 统计0的个数
 */
public class AtomicTest {
    private static AtomicLong atomicLong = new AtomicLong();
    private static Integer[] arrayOne = new Integer[]{0, 1, 2, 3, 0, 5, 6, 0, 56, 0};
    private static Integer[] arrayTwo = new Integer[]{10, 1, 2, 3, 0, 5, 6, 0, 56, 0};
    public static void main(String[] args) throws InterruptedException {
        final Thread threadOne = new Thread(() -> {
            final int size = arrayOne.length;
            for (int i = 0; i < size; ++i) {
                if (arrayOne[i].intValue() == 0) {
                    atomicLong.incrementAndGet();
                }
            }
        });
        final Thread threadTwo = new Thread(() -> {
            final int size = arrayTwo.length;
            for (int i = 0; i < size; ++i) {
                if (arrayTwo[i].intValue() == 0) {
                    atomicLong.incrementAndGet();
                }
            }
        });
        threadOne.start();
        threadTwo.start();
        //等待线程执行完毕
        threadOne.join();
        threadTwo.join();
        System.out.println("count总数为: " + atomicLong.get()); //count总数为: 7
    }
}

这段代码很简单,就是每找到一个 0 就会调用 AtomicLong 的原子性递增方法。


在没有原子类的时候,实现计数器需要一定的同步措施,例如 synchronized 关键字等,但这些都是阻塞算法,对性能有一定的影响,而我们使用的 AtomicLong 使用的是CAS 非阻塞算法,性能更好。


但是在高并发下,AtomicLong 还会存在性能问题,JDK8 提供了一个在高并发下性能更好的 LongAdder 类。


LongAdder 介绍

前面说过,在高并发下使用 AtomicLong 时,大量线程会同时竞争同一个原子变量,但是由于同时只有一个线程的 CAS 操作会成功,所以会造成大量线程竞争失败后,会无限循环不断的自旋尝试 CAS 操作,白白浪费 CPU 资源。


所以在 JDK8 中新增了一个原子性递增或者递减类 LongAdder 用来克服高并发 AtomicLong 的缺点。既然 AtomicLong 的性能瓶颈是多个线程竞争一个变量的更新产生的,那如果把一个变量分成多个变量,让多个线程竞争多个资源,是不是就解决性能问题了?是的,LongAdder 就是这个思路。

image.png

如上图,在使用 LongAdder 时,则是在内部维护多个 Cell 变量,每个 Cell 里面有一个初始值为 0 的 long 型变量,这样的话在同等并发量的情况下,争夺单个线程更新操作的线程会减少,也就变相的减少争夺共享资源的并发量。


另外,如果多个线程在争夺同一个 Cell 原子变量时失败了,它并不会一直自旋重试,而是去尝试其它 Cell 变量进行 CAS 尝试,这样就增加了当前线程重试 CAS 成功的可能性,最后,在获取 LongAdder 当前值时,是把所有的Cell变量的value值累加后再加上base返回的


LongAdder 维护了一个延迟初始化的原子性更新数组(默认情况下 Cell 数组是 null)和一个基值变量 base,在一开始时并不创建 Cells 数组,而是在使用时创建,也就是惰性加载


在一开始判断 Cell 数组是 null 并且并发线程减少时,所有的累加都是在 base 变量上进行的,保持 Cell 数组的大小为 2 的 N 次方,在初始化时 Cell 数组中的 Cell 元素个数为 2,数组里面的变量实体是 Cell 类型。Cell 类型是 AtomicLong 的一个改进,用来减少缓存的争用,也就是解决伪共享问题。


在多个线程并发修改一个缓存行中的多个变量时,由于只能同时有一个线程去操作缓存行,将会导致性能的下降,这个问题就称之为伪共享

一般而言,缓存行有 64 字节,我们知道一个 long 是 8 个字节,填充 5 个 long 之后,一共就是 48 个字节。

而 Java 中对象头在 32 位系统下占用 8 个字节,64 位系统下占用 16 个字节,这样填充 5 个 long 型即可填满 64 字节,也就是一个缓存行。

JDK8 以及之后的版本 Java 提供了sun.misc.Contended 注解,通过@Contented 注解就可以解决伪共享的问题。

使用@Contented 注解后会增加 128 字节的 padding,并且需要开启-XX:-RestrictContended选项后才能生效。

在 LongAdder 中解决伪共享的真正的核心就在Cell数组,Cell数组使用了@Contented注解。


对于大多数孤立的多个原子操作进行字节填充是浪费的,因为原子性操作都是无规律地分散在内存中的(也就是说多个原子性变量的内存地址是不连续的),多个原子变量被放入同一个缓存行的可能性很小。但是原子性数组元素的内存地址是连续的,所以数组内的多个元素能经常共享缓存行,因此这里使用@Contented注解对 Cell 类进行字节填充,这防止了数组中多个元素共享一个缓存行,在性能上是一个提升。


LongAdder 源码分析

问题:


  1. LongAdder 的结构是怎样的?
  2. 当前线程应该访问 Cell 数组里面的哪一个 Cell 元素?
  3. 如何初始化 Cell 数组?
  4. Cell 数组如何扩容?
  5. 线程访问分配的 Cell 元素有冲突后如何处理?
  6. 如何保证线程操作被分配的 Cell 元素的原子性?


接下来我们看一下 LongAdder 的结构:


LongAdder 类继承自 Striped64 类,在 Striped64 内部维护这三个变量。


  • LongAdder 的真实值其实是 base 的值与 Cell 数组里面所有 Cell 元素中的 value 值的累加,base 是个基础值,默认为 0。
  • cellsBusy 用来实现自旋锁,状态值只有 0 和 1,当创建 Cell 元素,扩容 Cell 数组或者初始化 Cell 数组时,使用 CAS 操作该变量来保证同时只有一个线程可以进行其中之一的操作。
transient volatile Cell[] cells;
transient volatile long base;
transient volatile int cellsBusy;
public class LongAdder extends Striped64 implements Serializable {

Cell 的构造

下面我们看一下 Cell 的构造。


@sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }
    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}
可以看到,内部维护一个被声明为 volatile 的变量,这里声明 volatile 是为了保证内存可见性。另外 cas 函数通过 CAS 操作,保证了当前线程更新时被分配的 Cell 元素中 value 值的原子性。并且可以看到 Cell 类是被@Contended 修饰的,避免伪共享。
至此我们已经知道了问题 1、6 的答案了。
sum()
sum()方法返回当前的值,内部操作是累加所有 Cell 内部的 value 值然后在累加 base。
由于计算总合时没有对 Cell 数组进行加锁,所以在累加过程中可能有其它线程对 Cell 值进行修改,也可能扩容,所以 sum 返回的值并不是非常准确的,其返回值并不是一个调用 sum()方法时原子快照值。
public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

reset()

reset()方法为重置操作,将 base 设置为 0,如果 Cell 数组有元素,则元素被重置为 0。

public void reset() {
    Cell[] as = cells; Cell a;
    base = 0L;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                a.value = 0L;
        }
    }
}

sumThenReset()

sumThenReset()方法是 sum()方法的改造版本,该方法在使用 sum 累加对应的 Cell 值后,把当前的 Cell 和 base 重置为 0。


该方法存在在线程安全问题,比如第一个调用线程清空 Cell 的值,则后一个线程调用时累加的都是 0 值。
public long sumThenReset() {
    Cell[] as = cells; Cell a;
    long sum = base;
    base = 0L;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null) {
                sum += a.value;
                a.value = 0L;
            }
        }
    }
    return sum;
}

add(long x)

接下来我们主要看 add()方法,这个方法里面可以回答刚才其他的问题。

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    //(1)
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        //(2)
        if (as == null || (m = as.length - 1) < 0 ||
            //(3)
            (a = as[getProbe() & m]) == null ||
            //(4)
            !(uncontended = a.cas(v = a.value, v + x)))
            //(5)
            longAccumulate(x, null, uncontended);
    }
}
final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

该方法首先判断 cells 是否 null,如果为 null 则在 base 上进行累加。如果 cells 不为 null,或者线程执行代码 cas 失败,则去执行第二步。代码第二步第三步决定当前线程应该访问 cells 数组中哪一个 Cell 元素,如果当前线程映射的元素存在的话则执行代码四。


第四步主要使用 CAS 操作去更新分配的 Cell 元素的 value 值,如果当前线程映射的元素不存在或者存在但是 CAS 操作失败则执行代码五。


线程应该访问 cells 数组的哪一个 Cell 元素是通过 getProbe() & m 进行计算的,其中 m 是当前 cells 数组元素个数-1,getProbe()则用于获取当前线程中变量 threadLocalRandomProbe 的值,这个值一开始为 0,在代码第五步里面会对其进行初始化。并且当前线程通过分配的 Cell 元素的 cas 函数来保证对 Cell 元素 value 值更新的原子性。


现在我们已经明白了第二个问题。


下面我们看一下 longAccumulate(x,null,uncontended)方法,该方法主要是 cells 数组初始化和扩容的地方。

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    //6. 初始化当前线程变量ThreadLocalRandomProbe的值
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        //7.
        if ((as = cells) != null && (n = as.length) > 0) {
            //8.
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            //9. 当前Cell存在,则执行CAS设置
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            //10. 当前Cell元素个数大于CPU个数
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            //11. 是否有冲突
            else if (!collide)
                collide = true;
            //12. 如果当前元素个数没有达到CPU个数,并且存在冲突则扩容
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // Expand table unless stale
                      //12.1
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    //12.2
                    cellsBusy = 0;
                }
                //12.3
                collide = false;
                continue;                   // Retry with expanded table
            }
            //13. 为了能够找到一个空闲的Cell,重新计算hash值,xorshift算法生成随机数
            h = advanceProbe(h);
        }
        //14. 初始化Cell数组
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    //14.1
                    Cell[] rs = new Cell[2];
                    //14.2
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                //14.3
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

该方法较为复杂,我们主要关注问题 3,问题 4,问题 5。


  1. 如何初始化 Cell 数组?
  2. Cell 数组如何扩容?
  3. 线程访问分配的 Cell 元素有冲突后如何处理?


每个线程第一次执行到代码第六步的时候,会初始化当前线程变量 ThreadLocalRandomProbe 的值,该值主要为了计算当前线程为了分配到cells数组中的哪一个cell元素中


cells 数组的初始化是在代码第十四步中进行,其中 cellsBusy 是一个标识,为 0 说明当前 cells 数组没有被初始化或者扩容,也没有在新建 Cell 元素,为 1 说明 cells 数组正在被初始化或者扩容、创建新元素,通过 CAS 来进行 0 或 1 状态切换,调用的是casCellsBusy()


假设当前线程通过 CAS 设置 cellsBuys 为 1,则当前线程开始初始化操作,那么这时候其他线程就不能进行扩容了,如代码(14.1)初始化 cells 数组个数为 2,然后使用h & 1计算当前线程应该访问 cell 数组的那个位置,使用的 h 就是当前线程的 threadLocalRandomProbe 变量。然后标识 Cells 数组以及被初始化,最后(14.3)重置了 cellsBusy 标记。虽然这里没有使用CAS操作,但是却是线程安全的,原因是cellsBusy是volatile类型的,保证了内存可见性。在这里初始化的 cells 数组里面的两个元素的值目前还是 null。现在我们知道了问题 3 的答案。


而 cells 数组的扩容是在代码第十二步进行的,对 cells 扩容是有条件的,也就是第十步、十一步条件都不满足后进行扩容操作。具体就是当前 cells 的元素个数小于当前机器 CPU 个数并且当前多个线程访问了 cells 中同一个元素,从而导致某个线程 CAS 失败才会进行扩容。


为何要涉及 CPU 个数呢?只有当每个 CPU 都运行一个线程时才会使多线程的效果最佳,也就是当 cells 数组元素个数与 CPU 个数一致时,每个 Cell 都使用一个 CPU 进行处理,这时性能才是最佳的。


代码第十二步也是先通过 CAS 设置 cellsBusy 为 1,然后才能进行扩容。假设 CAS 成功则执行代码(12.1)将容量扩充为之前的 2 倍,并复制 Cell 元素到扩容后数组。另外,扩容后 cells 数组里面除了包含复制过来的元素外,还包含其他新元素,这些元素的值目前还是 null。现在我们知道了问题 4 的答案。


在代码第七步、第八步中,当前线程调用 add()方法并根据当前线程的随机数 threadLocalRandomProbe 和 cells 元素个数计算要访问的 Cell 元素下标,然后如果发现对应下标元素的值为 null,则新增一个 Cell 元素到 cells 数组,并且在将其添加到 cells 数组之前要竞争设置 cellsBusy 为 1。


而代码第十三步,对 CAS 失败的线程重新计算当前线程的随机值 threadLocalRandomProbe,以减少下次访问 cells 元素时的冲突机会。这里我们就知道了问题 5 的答案。


总结

该类通过内部 cells 数组分担了高并发下多线程同时对一个原子变量进行更新时的竞争量,让多个线程可以同时对 cells 数组里面的元素进行并行的更新操作。另外,数组元素 Cell 使用@Contended 注解进行修饰,这避免了 cells 数组内多个原子变量被放入同一个缓存行,也就是避免了伪共享。


LongAccumulator 相比于 LongAdder,可以为累加器提供非 0 的初始值,后者只能提供默认的 0 值。另外,前者还可以指定累加规则,比如不进行累加而进行相乘,只需要在构造 LongAccumulator 时传入自定义的双目运算器即可,后者则内置累加的规则。
相关文章
|
21天前
|
数据可视化 数据挖掘 BI
团队管理者必读:高效看板类协同软件的功能解析
在现代职场中,团队协作的效率直接影响项目成败。看板类协同软件通过可视化界面,帮助团队清晰规划任务、追踪进度,提高协作效率。本文介绍看板类软件的优势,并推荐五款优质工具:板栗看板、Trello、Monday.com、ClickUp 和 Asana,助力团队实现高效管理。
45 2
|
7天前
|
人工智能 自然语言处理 Java
FastExcel:开源的 JAVA 解析 Excel 工具,集成 AI 通过自然语言处理 Excel 文件,完全兼容 EasyExcel
FastExcel 是一款基于 Java 的高性能 Excel 处理工具,专注于优化大规模数据处理,提供简洁易用的 API 和流式操作能力,支持从 EasyExcel 无缝迁移。
55 9
FastExcel:开源的 JAVA 解析 Excel 工具,集成 AI 通过自然语言处理 Excel 文件,完全兼容 EasyExcel
|
14天前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
Java 并发编程——volatile 关键字解析
|
14天前
|
JSON Java Apache
Java基础-常用API-Object类
继承是面向对象编程的重要特性,允许从已有类派生新类。Java采用单继承机制,默认所有类继承自Object类。Object类提供了多个常用方法,如`clone()`用于复制对象,`equals()`判断对象是否相等,`hashCode()`计算哈希码,`toString()`返回对象的字符串表示,`wait()`、`notify()`和`notifyAll()`用于线程同步,`finalize()`在对象被垃圾回收时调用。掌握这些方法有助于更好地理解和使用Java中的对象行为。
|
12天前
|
Java 数据库连接 Spring
反射-----浅解析(Java)
在java中,我们可以通过反射机制,知道任何一个类的成员变量(成员属性)和成员方法,也可以堆任何一个对象,调用这个对象的任何属性和方法,更进一步我们还可以修改部分信息和。
|
1月前
|
存储 算法 Java
Java内存管理深度解析####
本文深入探讨了Java虚拟机(JVM)中的内存分配与垃圾回收机制,揭示了其高效管理内存的奥秘。文章首先概述了JVM内存模型,随后详细阐述了堆、栈、方法区等关键区域的作用及管理策略。在垃圾回收部分,重点介绍了标记-清除、复制算法、标记-整理等多种回收算法的工作原理及其适用场景,并通过实际案例分析了不同GC策略对应用性能的影响。对于开发者而言,理解这些原理有助于编写出更加高效、稳定的Java应用程序。 ####
|
2天前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
31 17
|
12天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
14天前
|
安全 Java Kotlin
Java多线程——synchronized、volatile 保障可见性
Java多线程中,`synchronized` 和 `volatile` 关键字用于保障可见性。`synchronized` 保证原子性、可见性和有序性,通过锁机制确保线程安全;`volatile` 仅保证可见性和有序性,不保证原子性。代码示例展示了如何使用 `synchronized` 和 `volatile` 解决主线程无法感知子线程修改共享变量的问题。总结:`volatile` 确保不同线程对共享变量操作的可见性,使一个线程修改后,其他线程能立即看到最新值。
|
14天前
|
消息中间件 缓存 安全
Java多线程是什么
Java多线程简介:本文介绍了Java中常见的线程池类型,包括`newCachedThreadPool`(适用于短期异步任务)、`newFixedThreadPool`(适用于固定数量的长期任务)、`newScheduledThreadPool`(支持定时和周期性任务)以及`newSingleThreadExecutor`(保证任务顺序执行)。同时,文章还讲解了Java中的锁机制,如`synchronized`关键字、CAS操作及其实现方式,并详细描述了可重入锁`ReentrantLock`和读写锁`ReadWriteLock`的工作原理与应用场景。