java并发

简介: 1: 并发编程基础 如何减少上下文切换:: 使用无锁并发编程 CAS算法 使用最少线程 使用协程 使用无锁并发编程: 如将数据的ID按照HASH算法取摸分段,不同的线程处理不同的数据.(分段锁) CAS算法: Java的Atomic包使用的CAS算法来更新数据,而不需要加锁.

1: 并发编程基础

  • 如何减少上下文切换: : 使用无锁并发编程 CAS算法 使用最少线程 使用协程
    使用无锁并发编程: 如将数据的ID按照HASH算法取摸分段,不同的线程处理不同的数据.(分段锁)

CAS算法: Java的Atomic包使用的CAS算法来更新数据,而不需要加锁.
使用最少线程: 避免创建不需要的线程.比如任务少的时候,但是线程数太多.
使用协程 在单线程里实现多任务调度,并在单线程里维持多个任务的切换.

  • 避免死锁的几个方法: (1)避免一个线程同时获取多个锁.(2)避免一个线程在锁内同时占用多个资源,尽量保证每个锁只占用一个资源.(3)尝试使用定时锁,使用lock.tryLock(timeout)来代替内部锁.(4)对于数据库锁,加锁和解锁必须在一个数据库链接里.否则会出现解锁失败的情况.
  • volatile比synchronize轻量级.但只保证了可见性,没有保证原子性, 并且volatile不会引起线程的上下文切换和调度.
术语 英文单词 术语描述
内存屏障 memory barriers 是一组处理器指令,用于实现对内存操作的顺序限制
缓冲行 cache line 缓存中可以分配的最小存储单位.处理器填写缓存线时会加载整个缓存线,需要使用多个主内存读周期
原子操作 atomic operations 不可中断的一个或一系列操作
缓存行填充 cache line fill 当处理器识别到从内存中读取操作数是可缓存的,处理器读取整个缓存行到合适的缓存(L1 L2 L3的或所有)
缓存命中 cache hit 如果进行高速缓存行填充操作的内存位置仍然是下次处理器访问的地址时,处理从缓存中读取而不是从内存读取
写命中 write hit 当处理器将操作数写回到一个内存缓存区时,他首先会检查这个缓存的内存地址是否在缓存中,如果存在一个高效的缓存行,则处理器将这个操作数写回到缓存而不是写回到内存
写缺失 write misses the cache 一个有效的缓存行被写入到不存在的内存区域
  • Java中每个对象都可以作为锁, 具体表现为三种形式 (1)对于普通方法,锁是当前的实例(2)对于静态方法,锁是当前类的Class对象(3)对于同步方法块,锁是Synchronize括号里配置的对象.
  • JVM基于进入和退出Monitor对象来实现方法同步和代码同步, 代码同步: 使用monitorenter 和 monitorexit指令实现.而方法同步使用另外一种方式.细节在JVM规范里面没讲,但是,方法同步同样可以使用这两个指令来实现. monitorenter指令在编译后插入到同步代码块开始的位置,而monitorexit是插入到方法结束处和异常处

    • java1.6中 ,锁一共有四中状态: 无锁状态,偏向锁状态,轻量级锁状态和重量级状态 (依次从低到高)
  • 偏向锁: ,大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得,为了让线程获得锁的代价更低而引入了偏向锁. 当一个线程访问同步代码块并获取锁时,会在对象头和帧栈中记录里存储锁偏向的线程ID,以后该线程在进入和退出同步快时不需要进行CAS操作来加锁和解锁,只需要简单的测试一下对象头的MarkWord里是否存储着指向当前线程的偏向锁.如果测试成功.表示该线程已经获取锁.
    偏向锁使用了一种等到竞争出现才释放锁的机制,所以其他线程尝试竞争偏向锁时,持有偏向锁的线程才回释放,
  • 轻量级锁: 线程在执行同步代码块之前,JVM会先在当前线程的帧栈中创建用于存储锁记录的空间,并将对象头中的Mark Word复制到锁记录中. 称为: Displaced Mark Word,然后线程尝试使用CAS将对象头中的Mark Word替换为指向锁记录的指针.如果成功,当前线程获得锁,如果失败,表示其他线程竞争锁,当前线程便尝试使用自旋来获取锁.
优点 缺点 使用场景
偏向锁 加锁和解锁不需要额外消耗,和执行非同步方法相比存在纳秒级差别 如果线程间存在锁竞争,会带来额外的锁撤销消耗 适用于只有一个线程访问同步块的场景
轻量级锁 竞争的线程不会阻塞,提高了程序的响应速度 如果始终得不到锁竞争的线程,会使用自旋消耗CPU 追求响应时间,同步快执行速度快
重量级锁 线程竞争不使用自旋,不会消耗CPU 线程阻塞,响应时间慢 追求吞吐量,同步快执行速度长
  • CAS实现原子操作的三大问题: (1)ABA问题(2)循环时间长开销大(3)只能保证一个贡献变量的原子操作. ABA问题在java1.5之后提供了一个AtomicStampedReference来解决ABA问题.这个类的compareAndSet首先检查当前引用是否等于预期引用,并检查当前标志是否等于预期标志.如果全部相等,则以原子方式更新.
  • 同步: 指程序中用于控制不同线程间操作发生相对顺序的机制.
  • volatile:(1) 可见性, 对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入(2)原子性: 对任意单个volatile变量的读/写具有原子性,但类似于volatile++这种复合操作不具有原子性.
  • 公平锁和非公平锁:(1)公平锁和非公平锁释放时最后都要写一个volatile变量state (2)公平锁获取时,首先去读volatile变量(state) (3) 非公平锁获取时,首先会用CAS更新volatile变量,这个操作同时具有volatile读和volatile写的内存语义
  • concurrent包的实现示意图:
    ---------------------------------------------------------------

Lock | 同步器 | 阻塞队列 | Executor | 并发容器|
---------------------------------------------------------------

  AQS  | 非阻塞数据结构 | 原子变量类 |

---------------------------------------------------------------
volatile变量的读/写 CAS
---------------------------------------------------------------

  • final域的内存语义: 对final域的读和写更像是普通变量的访问.
  • 抛出InterruptedExeception的线程SleepThread. 其中断标识位被清除,而一直忙碌的线程BusyThread,中断标志位没有清除
public class Demo {

    public static void main(String[] args) {

        Thread sleepThread = new Thread(new SleepRunner(),"SleepThread");
        sleepThread.setDaemon(true);

        Thread busyThread = new Thread(new BusyRunner(),"BusyThread");
        busyThread.setDaemon(true);

        sleepThread.start();
        busyThread.start();

        sleep(5);
        sleepThread.interrupt();
        busyThread.interrupt();

        System.out.println("Sleep: "+sleepThread.isInterrupted());
        System.out.println("Busy: "+busyThread.isInterrupted());

    }
    static class SleepRunner implements Runnable{
        @Override
        public void run() {
            while (true){
                sleep(10);
            }
        }
    }
    static class BusyRunner implements Runnable{
        @Override
        public void run() {
            while (true);
        }
    }
}

Exception in thread "SleepThread" java.lang.IllegalStateException: java.lang.InterruptedException: sleep interrupted
    at java9demo/com.java9.utils.ConcurrentUtils.sleep(ConcurrentUtils.java:34)
    at java9demo/com.java9.artcp.Demo$SleepRunner.run(Demo.java:30)
    at java.base/java.lang.Thread.run(Thread.java:844)
Caused by: java.lang.InterruptedException: sleep interrupted
    at java.base/java.lang.Thread.sleep(Native Method)
    at java.base/java.lang.Thread.sleep(Thread.java:340)
    at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:401)
    at java9demo/com.java9.utils.ConcurrentUtils.sleep(ConcurrentUtils.java:32)
    ... 2 more
Sleep: false
Busy: true
  • 过期的suspend resume stop: suspend: 在调用后,线程不会释放已经占有的资源(比如锁),而是占着资源进入睡眠状态,这样容易引发死锁问题, stop方法会在终结一个线程时不会保证线程的资源正确释放,通常是没有给予线程完成资源释放工作的机会.因此会导致程序可能工作在不确定的状态下.
    static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss");
    public static void main(String[] args) {

        Thread printThread = new Thread(new Runner(),"PrintThread");
        printThread.setDaemon(true);
        printThread.start();
        sleep(3);

        printThread.suspend();
        System.out.println("main suspend at: "+LocalTime.now().format(formatter));
        sleep(3);
        printThread.resume();
        System.out.println("main resume at: "+LocalTime.now().format(formatter));
        sleep(3);
        printThread.stop();
        System.out.println("main stop at: "+LocalTime.now().format(formatter));
        sleep(3);
    }
    static class Runner implements Runnable{
        @Override
        public void run() {
            while (true){
                System.out.println(Thread.currentThread().getName()+" Run at: "+ LocalTime.now().format(formatter));
                sleep(1);
            }
        }
    }
  • 终止线程: (1)中断方式,(2)利用一个boolean变量来控制是否需要停止任务并终止该线程.
public static void main(String[] args) {
        Runner one = new Runner();
        Thread countThread = new Thread(one,"CountThread");
        countThread.start();
        sleep(1);
        countThread.interrupt();
        Runner two = new Runner();
        countThread = new Thread(two,"CountThread");
        countThread.start();
        sleep(1);
        two.cancel();
    }
    static class Runner implements Runnable{
        private long i;
        private volatile boolean on = true;
        public void cancel(){on = false;}
        @Override
        public void run() {
            while (on && !Thread.currentThread().isInterrupted()){
                i++;
            }
            System.out.println("count i= "+i);
        }
    }
  • 对同步快的实现使用了monitorenter 和moniterexit指令,而同步方法则是依靠方法修饰符上的ACC_SYNCHRONIZE来完成的. 无论采用哪种方式.其本质是对一个对象的监视器(moniter)进行获取,而这个获取是排他的.也就是同一时刻只能有一个线程获取到synchronize所保护的监视器
public static void main(String[] args) {
        synchronized (Demo.class){

        }
        m();
    }
    public static synchronized void m(){

    }
/------------------------------------------------------------------------------------------
public com.java9.artcp.Demo();
    descriptor: ()V
    flags: ACC_PUBLIC
    Code:
      stack=1, locals=1, args_size=1
         0: aload_0
         1: invokespecial #1                  // Method java/lang/Object."<init>":()V
         4: return
      LineNumberTable:
        line 3: 0
      LocalVariableTable:
        Start  Length  Slot  Name   Signature
            0       5     0  this   Lcom/java9/artcp/Demo;

  public static void main(java.lang.String[]);
    descriptor: ([Ljava/lang/String;)V
    flags: ACC_PUBLIC, ACC_STATIC
    Code:
      stack=2, locals=3, args_size=1
         0: ldc           #2                  // class com/java9/artcp/Demo
         2: dup
         3: astore_1
         4: monitorenter
         5: aload_1
         6: monitorexit
         7: goto          15
        10: astore_2
        11: aload_1
        12: monitorexit
        13: aload_2
        14: athrow
        15: invokestatic  #3                  // Method m:()V
        18: return
      Exception table:
         from    to  target type
             5     7    10   any
            10    13    10   any
      LineNumberTable:
        line 6: 0
        line 8: 5
        line 9: 15
        line 10: 18
      LocalVariableTable:
        Start  Length  Slot  Name   Signature
            0      19     0  args   [Ljava/lang/String;
      StackMapTable: number_of_entries = 2
        frame_type = 255 /* full_frame */
          offset_delta = 10
          locals = [ class "[Ljava/lang/String;", class java/lang/Object ]
          stack = [ class java/lang/Throwable ]
        frame_type = 250 /* chop */
          offset_delta = 4

  public static synchronized void m();
    descriptor: ()V
    flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED
    Code:
      stack=0, locals=0, args_size=0
         0: return
      LineNumberTable:
        line 13: 0
}
SourceFile: "Demo.java"
  • wait and notify的两种方式: 分为 等待方(消费者)通知方(生产者)
    等待方的原则: (1) 获取对象锁 (2)如果条件不满足,那么调用对象的wait方法,被通知后仍要检查条件(3)条件满足则执行对应的逻辑.

通知方(生产者): (1)获得对象锁(2)改变条件(3)通知所有等待在对象上的线程

`等待方(消费者)`
synchronized (对象){
    while(条件不满足){
        对象.wait();
    }
    对应的处理逻辑.
}
`通知方(生产者)`
synchronized (对象){
    改变条件
    对象.notifyAll();
}
  • Thread.join():当前线程A等待thread线程终止之后才从thread.join返回.例子是: 创建了10个线程,编号0-9,每个线程调用前一个线程的join方法,也就是线程0结束了,线程1才能从join方法中返回,而线程0需要等待main线程结束.
public static void main(String[] args) {
        Thread previous = Thread.currentThread();
        for (int i = 0; i < 10; ++i) {
            Thread thread = new Thread(new Domion(previous), String.valueOf(i));
            thread.start();
            previous = thread;
        }
    }
    static class Domion implements Runnable{
        private Thread thread;

        public Domion(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+" terminate.");
        }
    }
/----------------------------------------------------------------------------------
0 terminate.
1 terminate.
2 terminate.
3 terminate.
4 terminate.
5 terminate.
6 terminate.
7 terminate.
8 terminate.
9 terminate.
  • ThreadLocal的用法: 例如在AOP中,可以在方法调用前的切入点执行begin而在方法调用之后切入点执行end()方法.
public class Profiler {
    private static final ThreadLocal<Long> TIME_THREADLOCAL = ThreadLocal.withInitial(System::currentTimeMillis);
    public static final void begin(){
        TIME_THREADLOCAL.set(System.currentTimeMillis());
    }
    public static final long end(){
        return System.currentTimeMillis() - TIME_THREADLOCAL.get();
    }

    public static void main(String[] args) {
        Profiler.begin();
        sleep(1);
        System.out.println("Cost: "+Profiler.end()+" mills");
    }
}

一个简单的线程池实例

public interface ThreadPool<Job extends Runnable> {

    void execute(Job job);

    void shutdown();

    void addWorkers(int num);

    void removeWorker(int num);

    int getJobSize();
}
package com.java9.artcp;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
    //线程池最大限制数
    private static final int MAX_WORKER_NUMBERS = 10;
    //线程池默认数量
    private static final int DEFAULT_WORKER_NUMBERS = 5;
    //线程池最小数量
    private static final int MIN_WORKER_NUMBERS = 5;
    //这是一个工作列表,回想里面插入工作.
    private final LinkedList<Job> jobs = new LinkedList<>();
    //工作者列表
    private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>());
    //工作者线程数量
    private int workerNum = DEFAULT_WORKER_NUMBERS;
    //线程编号生成
    private AtomicLong threadNum = new AtomicLong();

    public DefaultThreadPool(){
        initWorkers(DEFAULT_WORKER_NUMBERS);
    }
    public DefaultThreadPool(int num){
        workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
        initWorkers(num);
    }
    @Override
    public void execute(Job job) {
        if(job != null){
            //添加一个工作,然后进行通知
            synchronized (jobs){
                jobs.add(job);
                jobs.notify();
            }
        }
    }

    @Override
    public void shutdown() {
        workers.forEach(Worker::shutdown);
    }

    @Override
    public void addWorkers(int num) {
        synchronized (jobs){
            //限制新增的worker数量不能超过最大值
            if(num + this.workerNum > MAX_WORKER_NUMBERS){
                num = MAX_WORKER_NUMBERS - this.workerNum;
            }
            initWorkers(num);
            this.workerNum += num;
        }
    }

    @Override
    public void removeWorker(int num) {
        synchronized (jobs){
            if(num >= this.workerNum){
                throw new IllegalArgumentException("beyond workNum");
            }
            //按照给定的数量停止worker
            int count =0;
            while (count < num){
                Worker worker = workers.get(count);
                if(workers.remove(worker)){
                    worker.shutdown();
                    count++;
                }
            }
            this.workerNum -= count;
        }
    }

    @Override
    public int getJobSize() {
        return jobs.size();
    }

    //初始化线程工作者
    private void initWorkers(int num){
        for (int i = 0; i < num; ++i) {
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker,"ThreadPool-Worker-"+threadNum.incrementAndGet());
            thread.start();
        }
    }

    //工作者,负责消费任务
    class Worker implements Runnable{
        //是否工作
        private volatile boolean running = true;

        public void shutdown(){running = false;}

        @Override
        public void run() {
            while (running){
                Job job = null;
                synchronized (jobs){
                    //如果工作者列表为空,那么久wait
                    while (jobs.isEmpty()){
                        try {
                            jobs.wait();
                        } catch (InterruptedException e) {
                            //感知到外部对workerThread的中断操作,返回
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    //取出一个job
                    job = jobs.removeFirst();
                }
                if(job != null){
                    try {
                        job.run();
                    } catch (Exception e) {
                        //忽略运行中的异常
                    }
                }
            }
        }
    }
}
  • 同步队列器(AbstractQueuedSynchronizer) [AQS]: 是用来固件锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同状态,通过内置的FIFO队列来完成资源获取线程的排队工作. 主要使用的方式是继承 ,子类通过继承同步器来实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态的更改.这时就需要使用同步器提供的三个方法: (getState, setState(int newState)和 compareAndSetState(int expect,int update))来进行操作,因为他们能够保证状态的改变是安全的,子类推荐被定义为自定义组件的静态内部类,同步器自身没有实现任何同步接口,他仅仅定义了若干同步状态获取和释放的方法来供自定义同步组件使用 同步器既可以支持独占式获取同步状态,也可以支持共享式的获取同步状态 --> 实现的不同形式: ReentrantLock ReentrantReadWriteLock和CountDownLatch等.
  • 同步器是实现锁的关键, 锁是面向使用者的,它自定义了使用者与锁交互的接口(比如可以允许两个线程并行访问).隐藏了实现细节 同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理,线程的排队,等待和唤醒等底层操作
方法名称 描述
boolean tryAcquire(int arg) 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后进行CAS设置同步状态
boolean tryRelase(int arg) 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态
boolean int tryAcquireShared(int arg) 共享式获取同步状态,返回大于等于0的值,表示获取成功反之获取失败
boolean tryRelaseShared(int arg) 共享式释放同步状态
boolean isHeldExclusively() 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占
package com.java9.artcp;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class Mutex implements Lock {
    //静态内部类,自定义同步器
    private static class Sync extends AbstractQueuedSynchronizer{
        //是否处于独占状态
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
        //当状态为0的时候获取锁
        @Override
        protected boolean tryAcquire(int acquires) {
            if(compareAndSetState(0,1)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        //释放锁,将状态设置为0
        @Override
        protected boolean tryRelease(int release) {
            if(getState() == 0) throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        Condition newCondition(){return new ConditionObject();}
    }
    //仅需要将操作代理到Sync上即可
    private final Sync sync = new Sync();
    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}
/---------------------------------------------------------------------------------------------------------
private static Mutex mutex = new Mutex();
    private static Condition condi1 = mutex.newCondition();
    private static Condition condi2 = mutex.newCondition();
    private static Condition condi3 = mutex.newCondition();
    private static volatile long count = 1;
    public static void main(String[] args) {
        Print1 print1 = new Print1();
        Print2 print2 = new Print2();
        Print3 print3 = new Print3();
        print3.start();
        print2.start();
        print1.start();

    }
    private static class Print1 extends Thread{
        @Override
        public void run() {
            while (true){
                mutex.lock();
                try {
                    while (count != 1){
                        condi1.await();
                    }
                    System.out.println("111");
                    sleep(1000);
                    count=2;
                    condi2.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    mutex.unlock();
                }
            }
        }
    }
    private static class Print2 extends Thread{
        @Override
        public void run() {
            while (true){
                mutex.lock();
                try {
                    while (count != 2){
                        condi2.await();
                    }
                    System.out.println("222");
                    sleep(1000);
                    count=3;
                    condi3.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    mutex.unlock();
                }
            }
        }
    }
    private static class Print3 extends Thread{
        @Override
        public void run() {
            while (true){
                mutex.lock();
                try {
                    while (count != 3){
                        condi3.await();
                    }
                    System.out.println("333");
                    sleep(1000);
                    count=1;
                    condi1.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    mutex.unlock();
                }
            }
        }
    }

在获取同步状态时,同步器维护了一个同步队列(FIFP),获取状态失败的线程都会被加入到队列中并在队列中进行自旋,移出队列(或停止自旋)的条件是前驱节点为头结点且成功获取了同步状态.在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态.然后唤醒头结点 的后继节点

  • 设计一个同步工具,该工具在同一时刻,只允许至多两个线程同时访问,超过2个线程的访问将被阻塞
public class TwinsLock implements Lock {
    private static class Sync extends AbstractQueuedSynchronizer{
        public Sync(int count){
            if(count <= 0) throw new IllegalArgumentException("count must large than zero");
            setState(count);
        }

        @Override
        protected int tryAcquireShared(int reduceCount) {
            for(;;){
                int current = getState();
                int newCount = current-reduceCount;
                if(newCount<0 || compareAndSetState(current,newCount)){
                    return newCount;
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int returnCount) {
            for(;;){
                int current = getState();
                int newCount = current+returnCount;
                if(compareAndSetState(current,newCount)){
                    return true;
                }
            }
        }
        Condition getCondition(){return new ConditionObject();}
    }
    private final Sync sync = new Sync(2);
    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquireShared(1) > 0;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);
    }

    @Override
    public Condition newCondition() {
        return sync.getCondition();
    }
}
/-------------------------------------------------------------------------------------------
private static TwinsLock lock = new TwinsLock();

    public static void main(String[] args) {
        //开启10个线程
        for (int i = 10; i < 100; ++i) {
            Worker worker=new Worker();
            worker.setDaemon(true);
            worker.setName("thread-"+i);
            worker.start();
        }
        //每隔一秒换行
        for (int i = 0; i < 10; ++i) {
            sleep(1);
            System.out.println();
        }
    }
    static class Worker extends Thread{

        @Override
        public void run() {
            while (true){
                lock.lock();
                try {
                    sleep(1000);
                    System.out.println("--> "+Thread.currentThread().getName());
                    sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    lock.unlock();
                }
            }
        }
    }
  • 阻塞队列(BlockingQueu):
方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(2) put(e) offer(e,time,unit)
移除方法 remove(e) poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用
  • ArrayBlockingQueue : 一个由数组组成的有界阻塞队列
  • LinkedBlockingQueue :一个由链表组成的有界阻塞队列,最大长度Integer.MAX_VALUE
  • PriorityBlockingQueue : 支持优先级排序的无界阻塞队列
  • DelayQueue : 一个一个使用优先级队列实现的无界阻塞队列
  • SynchronousQueue : 一个不存储元素的阻塞队列
  • LinkedTransferQueue: 一个由链表构成的无界阻塞队列
  • LinkedBlockingDeque 一个由链表组成的双向阻塞队列
  • ForkJoin 框架
public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        CountTask task = new CountTask(1,10000);
        ForkJoinTask<Integer> res = forkJoinPool.submit(task);
        try {
            System.out.println(res.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
    static class CountTask extends RecursiveTask<Integer>{
        private static final int THRESHOLD = 2000;//阈值
        private int start,end;

        public CountTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Integer compute() {
            int sum = 0;
            //如果任务足够小就计算任务
            boolean canCompute = (end-start) <= THRESHOLD;
            if(canCompute){
                for (int i=start;i<=end;i++){
                    sum += i;
                }
            }else {
                //如果任务大于阈值,就分裂成2个子任务
                int middle = (start+end)/2;
                CountTask left = new CountTask(start,middle);
                CountTask right = new CountTask(middle+1,end);
                //执行子任务
                left.fork();right.fork();
                //等待子任务执行完,并得到其结果
                int leftResult = left.join();
                int rightResult = right.join();
                //合并子任务
                sum = leftResult + rightResult;
            }
            return sum;
        }
    }
  • 原子类 :AtomicIntegerArray : 原子更新数组 , AtomicLongArray ,AtomicReferenceArray:原子更新引用类型数组里的元素.
    需要注意的是: 数组value通过构造方法传递进去,然后AtomicIntegerArray会将当前数组复制一份,所以当AtomicInterArra对内部的数组元素进行修改时,不会影响传入的数组.

    static int[] value = new int[]{1,2};
    static AtomicIntegerArray ai = new AtomicIntegerArray(value);
    public static void main(String[] args) {
        ai.getAndSet(0,3);
        System.out.println(ai.get(0)); //3
        System.out.println(value[0]); //1
    }
  • CyclicBarrier 的例子:
public class CyclicBarrierDemo implements Runnable{
    //创建4个屏障,处理完之后执行当前类的run方法
    private CyclicBarrier c = new CyclicBarrier(4,this);
    private Executor executor = Executors.newFixedThreadPool(4);
    private ConcurrentHashMap<String,Integer> sheetBankWaterCount = new ConcurrentHashMap<>();
    public  void count(){
        for (int i = 0; i < 4; ++i) {
            executor.execute(() -> {
                System.out.println("-> ");
                //计算当前的sheet的流水数据.
                sheetBankWaterCount.put(Thread.currentThread().getName(),1);
                //计算完插入一个屏障
                try {
                    c.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }
    }
    @Override
    public void run() {
        int result = sheetBankWaterCount.values().stream().reduce(0,Integer::sum);
        //输出结果
        sheetBankWaterCount.put("result",result);
        System.out.println("result: "+result);
    }

    public static void main(String[] args) {

        CyclicBarrierDemo demo = new CyclicBarrierDemo();
        demo.count();

    }
}
相关文章
|
1月前
|
存储 安全 算法
解读 Java 并发队列 BlockingQueue
解读 Java 并发队列 BlockingQueue
19 0
|
2月前
|
监控 安全 算法
Java并发基础:LinkedTransferQueue全面解析!
LinkedTransferQueue类实现了高效的线程间数据传递,支持等待匹配的生产者-消费者模式,基于链表的无界设计使其在高并发场景下表现卓越,且无需担心队列溢出,丰富的方法和良好的可扩展性满足了各种复杂应用场景的需求。
Java并发基础:LinkedTransferQueue全面解析!
|
2月前
|
Java 程序员 API
Java并发基础:concurrent Flow API全面解析
java.util.concurrent.Flow定义了响应式编程的核心接口,促进了Java在异步数据处理和背压机制方面的标准化,这使得第三方库如Reactor和RxJava能够基于这些接口提供丰富的实现和功能,同时简化了响应式编程在Java中的使用,Flow API增强了Java在并发编程领域的灵活性,使得处理异步数据流变得更加自然和高效。
108 0
Java并发基础:concurrent Flow API全面解析
|
1月前
|
存储 缓存 算法
Java并发基础:原子类之AtomicMarkableReference全面解析
AtomicMarkableReference类能够确保引用和布尔标记的原子性更新,有效避免了多线程环境下的竞态条件,其提供的方法可以轻松地实现基于条件的原子性操作,提高了程序的并发安全性和可靠性。
Java并发基础:原子类之AtomicMarkableReference全面解析
|
2天前
|
Java 开发者
Java中多线程并发控制的实现与优化
【4月更文挑战第17天】 在现代软件开发中,多线程编程已成为提升应用性能和响应能力的关键手段。特别是在Java语言中,由于其平台无关性和强大的运行时环境,多线程技术的应用尤为广泛。本文将深入探讨Java多线程的并发控制机制,包括基本的同步方法、死锁问题以及高级并发工具如java.util.concurrent包的使用。通过分析多线程环境下的竞态条件、资源争夺和线程协调问题,我们提出了一系列实现和优化策略,旨在帮助开发者构建更加健壮、高效的多线程应用。
3 0
|
3天前
|
存储 缓存 安全
Java并发基础之互斥同步、非阻塞同步、指令重排与volatile
在Java中,多线程编程常常涉及到共享数据的访问,这时候就需要考虑线程安全问题。Java提供了多种机制来实现线程安全,其中包括互斥同步(Mutex Synchronization)、非阻塞同步(Non-blocking Synchronization)、以及volatile关键字等。 互斥同步(Mutex Synchronization) 互斥同步是一种基本的同步手段,它要求在任何时刻,只有一个线程可以执行某个方法或某个代码块,其他线程必须等待。Java中的synchronized关键字就是实现互斥同步的常用手段。当一个线程进入一个synchronized方法或代码块时,它需要先获得锁,如果
21 0
|
11天前
|
存储 缓存 安全
【企业级理解】高效并发之Java内存模型
【企业级理解】高效并发之Java内存模型
|
18天前
|
安全 Java
Java中的多线程并发控制
在Java中,多线程是实现并发执行任务的一种重要方式。然而,随着多个线程同时访问共享资源,可能会导致数据不一致和其他并发问题。因此,了解并掌握Java中的多线程并发控制机制显得尤为重要。本文将深入探讨Java的多线程并发控制,包括synchronized关键字、Lock接口、Semaphore类以及CountDownLatch类等,并通过实例代码演示其使用方法和注意事项。
12 2
|
23天前
|
缓存 NoSQL Java
Java项目:支持并发的秒杀项目(基于Redis)
Java项目:支持并发的秒杀项目(基于Redis)
26 0
|
24天前
|
算法 安全 Java
Java中的并发编程:理解并发性能优化
在当今软件开发领域,多核处理器的普及使得并发编程变得更加重要。本文将深入探讨Java中的并发编程,介绍并发性能优化的关键技术,帮助开发人员更好地利用多核处理器提升应用程序性能。