无锁队列
package untils; import java.lang.reflect.Field; import java.util.concurrent.atomic.AtomicInteger; import lombok.Data; import sun.misc.Unsafe; /** * Created on 2021-06-23 */ public class NoLockQueue { private static final Unsafe unsafe; //头节点 volatile Node head; //尾节点 volatile Node tail; //头节点偏移量 private static final Long headOffset; //尾节点偏移量 private static final Long tailOffset; //当前队列的长度 private AtomicInteger length = new AtomicInteger(0); //队列允许的最大长度 private int maxSize = 0; static { try { //获取成员变量 Field field = Unsafe.class.getDeclaredField("theUnsafe"); //设置为可访问 field.setAccessible(true); //是静态字段,用null来获取Unsafe实例 unsafe = (Unsafe) field.get(null); //设置头节点变量在类中的偏移值 headOffset = unsafe.objectFieldOffset(NoLockQueue.class.getDeclaredField("head")); //设置尾节点变量在类中的偏移值 tailOffset = unsafe.objectFieldOffset(NoLockQueue.class.getDeclaredField("tail")); } catch (Exception e) { System.out.println(e.getLocalizedMessage()); throw new Error(e); } } public NoLockQueue() { this.maxSize = Integer.MAX_VALUE; //初始化节点 head = tail = new Node(); } public NoLockQueue(int maxSize) { this.maxSize = maxSize; //初始化节点 head = tail = new Node(); } /** * 入队 */ public void enQueue(int value) { //创建新节点 Node newNode = new Node(); newNode.setValue(value); while (true) { //获取尾节点 Node oldTail = this.tail; if (length.get() < maxSize && oldTail.casNext(null, newNode)) { System.out.println(Thread.currentThread().getName() + "进队列:" + value); unsafe.compareAndSwapObject(this, tailOffset, oldTail, newNode); length.incrementAndGet(); break; } } } /** * 出队 */ public void dequeue() { while (true) { //如果没有数据 if (length.get() <= 0) { continue; } //获取头节点 Node oldHead = this.head; Node oldNext = oldHead.getNext(); if (unsafe.compareAndSwapObject(this, headOffset, head, oldNext)) { System.out.println(Thread.currentThread().getName() + "出队列:" + head.getValue()); length.decrementAndGet(); break; } } } } @Data class Node { //Unsafe类 private static final Unsafe unsafe; //next变量的偏移量 private static final Long nextOffset; //值 private volatile int value; //next节点 private volatile Node next; static { try { //获取成员变量 Field field = Unsafe.class.getDeclaredField("theUnsafe"); //设置为可访问 field.setAccessible(true); //是静态字段,用null来获取Unsafe实例 unsafe = (Unsafe) field.get(null); //获取state变量在类中的偏移值 nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next")); } catch (Exception e) { System.out.println(e.getLocalizedMessage()); throw new Error(e); } } /** * cas的方式设置next的值 * @param before 期望的值 * @param after 修改的值 * @return */ public boolean casNext(Node before, Node after) { return unsafe.compareAndSwapObject(this, nextOffset, before, after); } }
测试类
package untils; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * Created on 2021-06-23 */ public class Main { public static void main(String[] args) { NoLockQueue queue = new NoLockQueue(10); ExecutorService executorService = Executors.newFixedThreadPool(20); Random random = new Random(); for (int i = 0; i < 10; i++) { executorService.submit(new Runnable() { @Override public void run() { queue.enQueue(random.nextInt()); } }); } // //判断入队的顺序 // System.out.println("----------------"); // try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } finally { } // Node p = queue2.head.getNext(); // while (p != null){ // System.out.println(p.getValue()); // p = p.getNext(); // } // // System.out.println("----------------"); for (int i = 0; i < 10; i++) { executorService.submit(new Runnable() { @Override public void run() { queue.dequeue(); } }); } try { TimeUnit.SECONDS.sleep(5); } catch (Exception e) { e.printStackTrace(); } finally { } executorService.shutdown(); } }
小插曲
unsafe类的获取
其实当时参考的是AtomicInteger里获取unsafe方法
private static final Unsafe unsafe = Unsafe.getUnsafe();
但是报错了,报错的原因竟然是 双亲委派模型
那怎么获取unsafe类呢,如下所示,固定格式
//获取成员变量 Field field = Unsafe.class.getDeclaredField("theUnsafe"); //设置为可访问 field.setAccessible(true); //是静态字段,用null来获取Unsafe实例 Unsafe unsafe = (Unsafe) field.get(null);
打印顺序不对,影响了代码的正确性
/** * 出队 */ public void dequeue() { while (true) { //如果没有数据 if (length.get() <= 0) { continue; } //获取头节点 Node oldHead = this.head; Node oldNext = oldHead.getNext(); if (unsafe.compareAndSwapObject(this, headOffset, head, oldNext)) { //正确的位置 //System.out.println(Thread.currentThread().getName() + "出队列:" + head.getValue()); length.decrementAndGet(); //错误的位置 System.out.println(Thread.currentThread().getName() + "出队列:" + head.getValue()); break; } } }
一开始我在错误的位置打印,发现入队和出队的顺序不一样,后来换了一个位置试了一下,好了。最后还是分析一下为什么吧。
比如此时此刻 队列里有2个元素A和B。现在2个线程按照下面的顺序执行,其实理论上出队顺序是没有问题的,只不过后面的先打印了,给了一种先出队的错觉。
收获
其实这个里面使用AtomicReference实现的,主要想用他的cas;但是我感觉有些绕,所以就自己用unsafe类实现cas。
断断续续写了一天才写了一个demo,其实不亏,至少写出来了。