ConcurrentLinkedQueue 使用了非阻塞的方式提供线程安全的队列,内部使用的是一个普通的单向链表,记录首尾节点,并且首部有一个dummy node
。入队时在尾部加入节点,出队时从首部删除节点。所有对链表的修改都通过CAS操作来保证原子性。下面分析其主要的代码实现。
1. offer(E e)
在单线程情况下,在进行入队操作时,要进行以下步骤:
- 首先要创建一个新节点
newNode
- 然后找到链表的最后一个节点
tail
- 将
tail
指向newNode
(tail.next = newNode
) - 最后更新
tail = newNode
在多线程情况下,由于入队操作涉及到上面所述的多个步骤,无法保证在执行过程中其它线程不会对链表进行修改,具体表现在:
3
中进行链接时,链表中最后一个节点已经不再是2
中读取到的tail
。因此要重新寻找尾部节点4
中进行tail
更新前,newNode
不再是链表中最后一个节点。因此要将tail
链接到真正的尾部节点
1.1 实现代码
接下来看一下offer(E e)
的实现代码:
/**
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never return {@code false}.
*
* @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
*/
L1 public boolean offer(E e) {
L2 checkNotNull(e);
L3 final Node<E> newNode = new Node<E>(e);
L4 for (Node<E> t = tail, p = t;;) {
L5 Node<E> q = p.next;
L6 if (q == null) {
// p is last node
L7 if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
L8 if (p != t) // hop two nodes at a time
L9 casTail(t, newNode); // Failure is OK.
L10 return true;
}
// Lost CAS race to another thread; re-read next
}
L11 else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
L12 p = (t != (t = tail)) ? t : head;
L13 else
// Check for tail updates after two hops.
L14 p = (p != t && t != (t = tail)) ? t : q;
}
}
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
代码中t != (t = tail)
这样的写法令人难以理解,其实等价于:
{ // 读取t的值并与tail进行比较,比较结果作为返回值
// 比较完之后更新t的值为tail
boolean ret = t != tail;
t = tail;
return ret;
}
1.2 图示
上面代码中的变量命名有点让人难以理解,这里我们令
QTail = tail
,表示链表最后一个节点tail = t
,表示临时读取出的QTail
,多线程环境下执行过程中tail
不一定等于QTail
ptr = p
,ptr
是一个指针,用来寻找链表最后一个节点next = q
,next = ptr.next
,通过next == null
来判断ptr
是不是最后一个节点。
Figure 1
是单线程情况下,队列[1, 2]
插入3
的描述图,图中左侧行号表示在执行完该行的代码之后,链表的情况。
可以看到由于L8
的判断失败,QTail
并没有进行更新,因为代码采用了延时更新的策略,累计两个操作之后才会一次性把尾节点移动两步。(QHead
也是一样的)
Figure 2
是单线程情况下,队列[1,2,3]
插入4
的描述图。
思路比较简单,由于采用了延时更新的策略,需要向右移动ptr
(L14: ptr = next
)才能找到尾节点,这样也就导致了在ptr
指向尾节点时ptr != QTail
,故需要更新QTail
(L9
)。
多线程环境下,L14
中的判断可能为真,表示QTail
被其它线程更新了,故需要直接调整tail = QTail
而不是tail = next
。如Figure 3
所示。
多线程环境下,更新操作L9
可能会失败,因为也许其它线程又在4
后面添加了一个新的节点5
,并抢先更新了QTail
多线程环境下,节点还可能被删除,删除时会将next
指针指向自己。因此L12
可能会发生。故需要将ptr
指向QTail
。当QTail
也被删除时,则需要从QHead
开始重新寻找尾节点了。
至此,offer(E e)
代码的所有分支都已经覆盖
2. poll()
在单线程情况下,在进行出队操作时,要进行以下步骤:
- 首先要找到链表的第一个有效节点
node
(viahead
) - 将该
node
的值设为null
- 将旧
head
的next
指针指向自己 - 更新
head
到node
在多线程情况下,由于出队操作涉及到上面所述的多个步骤,无法保证在执行过程中其它线程不会对链表进行修改。遇到并发修改时,代码的目的就是重新找到第一个有效节点。
2.1 实现代码
public E poll() {
L1 restartFromHead:
L2 for (;;) {
L3 for (Node<E> h = head, p = h, q;;) {
L4 E item = p.item;
L5 if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
L6 if (p != h) // hop two nodes at a time
L7 updateHead(h, ((q = p.next) != null) ? q : p);
L8 return item;
}
L9 else if ((q = p.next) == null) {
L10 updateHead(h, p);
L11 return null;
}
L12 else if (p == q)
L13 continue restartFromHead;
else
L14 p = q;
}
}
}
2.2 图示
上面代码中的变量命名有点让人难以理解,这里我们令
QHead = head
,表示链表dummy node
或者是第一个有效节点head = t
,表示临时读取出的QHead
,多线程环境下执行过程中head
不一定等于QHead
ptr = p
,ptr
是一个指针,用来寻找链表第一个有效节点next = q
,next = ptr.next
,通过next != null
来判断ptr
是不是第一个有效节点
Figure 4
是队列[1,2,3]
删除1
的图示,可以看到删除之后由于滞后更新策略,QHead
一下移动了两个节点,直接指向了第一个有效节点。
Figure 5
在Figure 4
的基础上继续进行poll()
,由于滞后更新,QHead
并没有移动。
Figure 6
在Figure 5
的基础上继续进行poll()
,可以发现QHead
跑到QTail
后面去了哦。L7
的判定结果为假,故没有将QHead
移动两步。
继续poll
时会执行L10
。至于L13
,多线程环境下可能QHead
以及移到next
后面去了。
至此,poll()
代码的所有分支都已经覆盖
(完)
References:
- ConcurrentLinkedQueue
- Michael, Maged M., and Michael L. Scott. "Simple, fast, and practical non-blocking and blocking concurrent queue algorithms." Proceedings of the fifteenth annual ACM symposium on Principles of distributed computing. ACM, 1996.
- Compare-and-swap
- ConcurrentLinkedQueue 源码分析 (基于Java 8)