java实现无锁队列

简介: 写作目的 说到无锁,其实就是用cas,不过我在百度上搜java实现无锁队列的文章其实不多,所以自己用cas和volatile实现一下,线程安全那是必须的。

无锁队列


package untils;
import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Data;
import sun.misc.Unsafe;
/**
 * Created on 2021-06-23
 */
public class NoLockQueue {
    private static final Unsafe unsafe;
    //头节点
    volatile Node head;
    //尾节点
    volatile Node tail;
    //头节点偏移量
    private static final Long headOffset;
    //尾节点偏移量
    private static final Long tailOffset;
    //当前队列的长度
    private AtomicInteger length = new AtomicInteger(0);
    //队列允许的最大长度
    private int maxSize = 0;
    static {
        try {
            //获取成员变量
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            //设置为可访问
            field.setAccessible(true);
            //是静态字段,用null来获取Unsafe实例
            unsafe = (Unsafe) field.get(null);
            //设置头节点变量在类中的偏移值
            headOffset = unsafe.objectFieldOffset(NoLockQueue.class.getDeclaredField("head"));
            //设置尾节点变量在类中的偏移值
            tailOffset = unsafe.objectFieldOffset(NoLockQueue.class.getDeclaredField("tail"));
        } catch (Exception e) {
            System.out.println(e.getLocalizedMessage());
            throw new Error(e);
        }
    }
    public NoLockQueue() {
        this.maxSize = Integer.MAX_VALUE;
        //初始化节点
        head = tail = new Node();
    }
    public NoLockQueue(int maxSize) {
        this.maxSize = maxSize;
        //初始化节点
        head = tail = new Node();
    }
    /**
     * 入队
     */
    public void enQueue(int value) {
        //创建新节点
        Node newNode = new Node();
        newNode.setValue(value);
        while (true) {
            //获取尾节点
            Node oldTail = this.tail;
            if (length.get() < maxSize && oldTail.casNext(null, newNode)) {
                System.out.println(Thread.currentThread().getName() + "进队列:" + value);
                unsafe.compareAndSwapObject(this, tailOffset, oldTail, newNode);
                length.incrementAndGet();
                break;
            }
        }
    }
    /**
     * 出队
     */
    public void dequeue() {
        while (true) {
            //如果没有数据
            if (length.get() <= 0) {
                continue;
            }
            //获取头节点
            Node oldHead = this.head;
            Node oldNext = oldHead.getNext();
            if (unsafe.compareAndSwapObject(this, headOffset, head, oldNext)) {
                System.out.println(Thread.currentThread().getName() + "出队列:" + head.getValue());
                length.decrementAndGet();
                break;
            }
        }
    }
}
@Data
class Node {
    //Unsafe类
    private static final Unsafe unsafe;
    //next变量的偏移量
    private static final Long nextOffset;
    //值
    private volatile int value;
    //next节点
    private volatile Node next;
    static {
        try {
            //获取成员变量
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            //设置为可访问
            field.setAccessible(true);
            //是静态字段,用null来获取Unsafe实例
            unsafe = (Unsafe) field.get(null);
            //获取state变量在类中的偏移值
            nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));
        } catch (Exception e) {
            System.out.println(e.getLocalizedMessage());
            throw new Error(e);
        }
    }
    /**
     * cas的方式设置next的值
     * @param before 期望的值
     * @param after 修改的值
     * @return
     */
    public boolean casNext(Node before, Node after) {
        return unsafe.compareAndSwapObject(this, nextOffset, before, after);
    }
}


测试类


package untils;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
 * Created on 2021-06-23
 */
public class Main {
    public static void main(String[] args) {
        NoLockQueue queue = new NoLockQueue(10);
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    queue.enQueue(random.nextInt());
                }
            });
        }
        //        //判断入队的顺序
        //        System.out.println("----------------");
        //        try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } finally { }
        //        Node p = queue2.head.getNext();
        //        while (p != null){
        //            System.out.println(p.getValue());
        //            p = p.getNext();
        //        }
        //
        //        System.out.println("----------------");
        for (int i = 0; i < 10; i++) {
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    queue.dequeue();
                }
            });
        }
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
        }
        executorService.shutdown();
    }
}


小插曲


unsafe类的获取


其实当时参考的是AtomicInteger里获取unsafe方法


private static final Unsafe unsafe = Unsafe.getUnsafe();


但是报错了,报错的原因竟然是 双亲委派模型


那怎么获取unsafe类呢,如下所示,固定格式


//获取成员变量
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            //设置为可访问
            field.setAccessible(true);
            //是静态字段,用null来获取Unsafe实例
            Unsafe unsafe = (Unsafe) field.get(null);


打印顺序不对,影响了代码的正确性


/**
     * 出队
     */
    public void dequeue() {
        while (true) {
            //如果没有数据
            if (length.get() <= 0) {
                continue;
            }
            //获取头节点
            Node oldHead = this.head;
            Node oldNext = oldHead.getNext();
            if (unsafe.compareAndSwapObject(this, headOffset, head, oldNext)) {
                //正确的位置
                //System.out.println(Thread.currentThread().getName() + "出队列:" + head.getValue());
                length.decrementAndGet();
                //错误的位置
                System.out.println(Thread.currentThread().getName() + "出队列:" + head.getValue());
                break;
            }
        }
    }


一开始我在错误的位置打印,发现入队和出队的顺序不一样,后来换了一个位置试了一下,好了。最后还是分析一下为什么吧。


比如此时此刻 队列里有2个元素A和B。现在2个线程按照下面的顺序执行,其实理论上出队顺序是没有问题的,只不过后面的先打印了,给了一种先出队的错觉。


收获


其实这个里面使用AtomicReference实现的,主要想用他的cas;但是我感觉有些绕,所以就自己用unsafe类实现cas。


断断续续写了一天才写了一个demo,其实不亏,至少写出来了。

目录
相关文章
|
2月前
|
前端开发 Java
java中的Queue队列的用法
java中的Queue队列的用法
19 1
|
2月前
|
存储 安全 算法
解读 Java 并发队列 BlockingQueue
解读 Java 并发队列 BlockingQueue
22 0
|
4月前
|
Java
队列(JAVA)
队列:只允许在一端进行插入数据操作,在另一端进行删除数据操作的特殊线性表,队列具有先进先出的性质。
21 0
|
5月前
|
Java
225. 用队列实现栈 --力扣 --JAVA
请你仅使用两个队列实现一个后入先出(LIFO)的栈,并支持普通栈的全部四种操作(push、top、pop 和 empty)。 实现 MyStack 类: void push(int x) 将元素 x 压入栈顶。 int pop() 移除并返回栈顶元素。 int top() 返回栈顶元素。 boolean empty() 如果栈是空的,返回 true ;否则,返回 false 。
45 1
|
7月前
|
存储 消息中间件 缓存
Java数据结构第三讲-栈/队列
Java数据结构第三讲-栈/队列
|
5天前
|
存储 安全 Java
Java多线程实战-从零手搓一个简易线程池(一)定义任务等待队列
Java多线程实战-从零手搓一个简易线程池(一)定义任务等待队列
|
7天前
|
存储 Java C++
Java集合篇之深度解析Queue,单端队列、双端队列、优先级队列、阻塞队列
Java集合篇之深度解析Queue,单端队列、双端队列、优先级队列、阻塞队列
18 0
|
15天前
|
消息中间件 Java API
RabbitMQ入门指南(五):Java声明队列、交换机以及绑定
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了Java声明队列、交换机以及绑定队列和交换机等内容。
25 0
|
3月前
|
存储 安全 Java
JAVA常用队列类
JAVA常用队列类
|
7月前
|
存储 Java 调度
Java 最常见的面试题:队列和栈是什么?有什么区别?
Java 最常见的面试题:队列和栈是什么?有什么区别?