ConcurrentLinkedQueue 使用了非阻塞的方式提供线程安全的队列,内部使用的是一个普通的单向链表,记录首尾节点,并且首部有一个dummy node。入队时在尾部加入节点,出队时从首部删除节点。所有对链表的修改都通过CAS操作来保证原子性。下面分析其主要的代码实现。
1. offer(E e)
在单线程情况下,在进行入队操作时,要进行以下步骤:
- 首先要创建一个新节点
newNode - 然后找到链表的最后一个节点
tail - 将
tail指向newNode(tail.next = newNode) - 最后更新
tail = newNode
在多线程情况下,由于入队操作涉及到上面所述的多个步骤,无法保证在执行过程中其它线程不会对链表进行修改,具体表现在:
3中进行链接时,链表中最后一个节点已经不再是2中读取到的tail。因此要重新寻找尾部节点4中进行tail更新前,newNode不再是链表中最后一个节点。因此要将tail链接到真正的尾部节点
1.1 实现代码
接下来看一下offer(E e)的实现代码:
/**
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never return {@code false}.
*
* @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
*/
L1 public boolean offer(E e) {
L2 checkNotNull(e);
L3 final Node<E> newNode = new Node<E>(e);
L4 for (Node<E> t = tail, p = t;;) {
L5 Node<E> q = p.next;
L6 if (q == null) {
// p is last node
L7 if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
L8 if (p != t) // hop two nodes at a time
L9 casTail(t, newNode); // Failure is OK.
L10 return true;
}
// Lost CAS race to another thread; re-read next
}
L11 else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
L12 p = (t != (t = tail)) ? t : head;
L13 else
// Check for tail updates after two hops.
L14 p = (p != t && t != (t = tail)) ? t : q;
}
}
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
代码中t != (t = tail)这样的写法令人难以理解,其实等价于:
{ // 读取t的值并与tail进行比较,比较结果作为返回值
// 比较完之后更新t的值为tail
boolean ret = t != tail;
t = tail;
return ret;
}
1.2 图示
上面代码中的变量命名有点让人难以理解,这里我们令
QTail = tail,表示链表最后一个节点tail = t,表示临时读取出的QTail,多线程环境下执行过程中tail不一定等于QTailptr = p,ptr是一个指针,用来寻找链表最后一个节点next = q,next = ptr.next,通过next == null来判断ptr是不是最后一个节点。
Figure 1是单线程情况下,队列[1, 2]插入3的描述图,图中左侧行号表示在执行完该行的代码之后,链表的情况。

可以看到由于L8的判断失败,QTail并没有进行更新,因为代码采用了延时更新的策略,累计两个操作之后才会一次性把尾节点移动两步。(QHead也是一样的)
Figure 2是单线程情况下,队列[1,2,3]插入4的描述图。

思路比较简单,由于采用了延时更新的策略,需要向右移动ptr(L14: ptr = next)才能找到尾节点,这样也就导致了在ptr指向尾节点时ptr != QTail,故需要更新QTail(L9)。

多线程环境下,L14中的判断可能为真,表示QTail被其它线程更新了,故需要直接调整tail = QTail而不是tail = next。如Figure 3所示。
多线程环境下,更新操作L9可能会失败,因为也许其它线程又在4后面添加了一个新的节点5,并抢先更新了QTail
多线程环境下,节点还可能被删除,删除时会将next指针指向自己。因此L12可能会发生。故需要将ptr指向QTail。当QTail也被删除时,则需要从QHead开始重新寻找尾节点了。
至此,offer(E e)代码的所有分支都已经覆盖
2. poll()
在单线程情况下,在进行出队操作时,要进行以下步骤:
- 首先要找到链表的第一个有效节点
node(viahead) - 将该
node的值设为null - 将旧
head的next指针指向自己 - 更新
head到node
在多线程情况下,由于出队操作涉及到上面所述的多个步骤,无法保证在执行过程中其它线程不会对链表进行修改。遇到并发修改时,代码的目的就是重新找到第一个有效节点。
2.1 实现代码
public E poll() {
L1 restartFromHead:
L2 for (;;) {
L3 for (Node<E> h = head, p = h, q;;) {
L4 E item = p.item;
L5 if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
L6 if (p != h) // hop two nodes at a time
L7 updateHead(h, ((q = p.next) != null) ? q : p);
L8 return item;
}
L9 else if ((q = p.next) == null) {
L10 updateHead(h, p);
L11 return null;
}
L12 else if (p == q)
L13 continue restartFromHead;
else
L14 p = q;
}
}
}
2.2 图示
上面代码中的变量命名有点让人难以理解,这里我们令
QHead = head,表示链表dummy node或者是第一个有效节点head = t,表示临时读取出的QHead,多线程环境下执行过程中head不一定等于QHeadptr = p,ptr是一个指针,用来寻找链表第一个有效节点next = q,next = ptr.next,通过next != null来判断ptr是不是第一个有效节点
Figure 4是队列[1,2,3]删除1的图示,可以看到删除之后由于滞后更新策略,QHead一下移动了两个节点,直接指向了第一个有效节点。

Figure 5在Figure 4的基础上继续进行poll(),由于滞后更新,QHead并没有移动。

Figure 6在Figure 5的基础上继续进行poll(),可以发现QHead跑到QTail后面去了哦。L7的判定结果为假,故没有将QHead移动两步。

继续poll时会执行L10。至于L13,多线程环境下可能QHead以及移到next后面去了。
至此,poll()代码的所有分支都已经覆盖
(完)
References:
- ConcurrentLinkedQueue
- Michael, Maged M., and Michael L. Scott. "Simple, fast, and practical non-blocking and blocking concurrent queue algorithms." Proceedings of the fifteenth annual ACM symposium on Principles of distributed computing. ACM, 1996.
- Compare-and-swap
- ConcurrentLinkedQueue 源码分析 (基于Java 8)