ConcurrentLinkedQueue 目标: 线程安全的队列,并发性能好。 实现: 基于链表的队列,ConcurrentLinkedQueue
,使用volatile
,CAS
和HOPS
变量(LockFree),性能较好,而且可以通过HOPS
变量调节性能.
初始时:
1 2 3 4 5 6 private transient volatile Node<E>tail = head;private transient volatile Node<E> tail;public ConcurrentLinkedQueue () { head = tail = new Node<E>(null ); }
simple解法(假想方案) 一种simple的实现方案是这样的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public <E> boolean offer (E e) { if (e == null ) { throw new NullPointerException(); } Node<E> n = new Node(e); for (; ; ) { Node<E> t = tail; if (t.casNext(null , n) && casTail(t, n)) { return true ; } } }
回想我们的目标:
安全: 由于tail是volatile
的,因此可以通过CAS操作保证并发写的安全;
性能: 每次入列至少需要:1 2 3 4 1次 volatile读;(tail) 2次 CAS操作; 2次 volatie写.(tail.next和tail) // tail.next实际上也是volatile变量。
jdk注释 直接阅读源码有可能误解了作者的意思,而且源码确实比较trick,看不太懂,注释里声明的几个基本点:
1 2 3 4 5 6 7 8 9 10 1. 仅有一个节点的next为null,它是队列的最后一个节点. 想访问最后一个节点的话,可以通过tail节点往后找,可以通过O(1)时间找到(最多4次); // 注意tail不一定指向最后一个节点 2. 队列没有null的实际数据(item = null),也就是容器不接受空元素。 // dumpHead头节点的item为null; 移除节点的时候把item置为null,所以null是一个很重要的标志量,所以不允许实际数据有null造成冲突; 3. 为了满足上一条,更为了gc,过期的节点的next指向自己; // 所以很多节点遇到p==p.next的时候,会跳转到head重新进入有效节点范围,或者进行过期逻辑处理; 4. 把item置为null相当于移除了该数据,迭代器iterator会跳过item为null的节点.
实际解法 这个思路最重要的有两点: 一点就是updateHead函数,离群节点的next置为自己; 另一点就是tail
不一定是最后一个节点. 由于volatile
写的开销远远高于volatile
读,jdk中的解法是:
1 2 3 1. 假设队列中最后一个节点为:realTail; 2. tail 不一定等于 realTail; 3. 仅当tail与realTail距离大于等于HOPS时,才更新tail=realTail.
这样设计以后,可以通过增加volatile读减少volatile写。
具体实现:
updateHead方法 1 2 3 4 5 6 7 final void updateHead (Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); }
offer方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public boolean offer (E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; if (q == null ) { if (p.casNext(null , newNode)) { if (p != t) casTail(t, newNode); return true ; } } else if (p == q) p = (t != (t = tail)) ? t : head; else p = (p != t && t != (t = tail)) ? t : q; } }
其中有一种trick:
1 2 3 4 5 6 7 8 9 t != (t = tail) if (t!=tail){t= tail; } else {}
因此p = (t != (t = tail)) ? t : head;
的实际含义就是:
1 2 3 4 5 6 7 8 9 10 11 if (t!=tail){ t=tail; p=t; } else { p=head; } 1 . 如果t!=tail,说明t过时了,tail可能还可用,因此尝试从tail重连;(t=tail,p=t);2 . 如果t==tail,说明t和tail都过时了,尝试从head重连。(p=head)
使用这个trick的目的不得而知,个人猜测是因为效率更高,代码更精炼。毕竟非阻塞并发存在的意义就是性能,而不是可读性,如果想要可读性,就去用阻塞队列了。 可读性下降了.但如果用得熟练的话,肉眼parse功力上涨,也可能对于大牛来说比if else阅读起来更快。
节点定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 private static class Node <E > { volatile E item; volatile Node<E> next; Node(E item) { UNSAFE.putObject(this , itemOffset, item); } boolean casItem (E cmp, E val) { return UNSAFE.compareAndSwapObject(this , itemOffset, cmp, val); } void lazySetNext (Node<E> val) { UNSAFE.putOrderedObject(this , nextOffset, val); } boolean casNext (Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this , nextOffset, cmp, val); } private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class ; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item" )); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next" )); } catch (Exception e) { throw new Error(e); } } }
poll方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public E poll () { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null && p.casItem(item, null )) { if (p != h) updateHead(h, ((q = p.next) != null ) ? q : p); return item; } else if ((q = p.next) == null ) { updateHead(h, p); return null ; } else if (p == q) continue restartFromHead; else p = q; } } }
succ方法 1 2 3 4 5 final Node<E> succ (Node<E> p) { Node<E> next = p.next; return (p == next) ? head : next; }
remove方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public boolean remove (Object o) { if (o != null ) { Node<E> next, pred = null ; for (Node<E> p = first(); p != null ; pred = p, p = next) { boolean removed = false ; E item = p.item; if (item != null ) { if (!o.equals(item)) { next = succ(p); continue ; } removed = p.casItem(item, null ); } next = succ(p); if (pred != null && next != null ) pred.casNext(p, next); if (removed) return true ; } } return false ; }