ConcurrentLinkedQueue 使用了非阻塞的方式提供线程安全的队列,内部使用的是一个普通的单向链表,记录首尾节点,并且首部有一个dummy node。入队时在尾部加入节点,出队时从首部删除节点。所有对链表的修改都通过CAS操作来保证原子性。下面分析其主要的代码实现。

1. offer(E e)

在单线程情况下,在进行入队操作时,要进行以下步骤:

  1. 首先要创建一个新节点newNode
  2. 然后找到链表的最后一个节点tail
  3. tail指向newNodetail.next = newNode
  4. 最后更新tail = newNode

在多线程情况下,由于入队操作涉及到上面所述的多个步骤,无法保证在执行过程中其它线程不会对链表进行修改,具体表现在:

  • 3中进行链接时,链表中最后一个节点已经不再是2中读取到的tail。因此要重新寻找尾部节点
  • 4中进行tail更新前,newNode不再是链表中最后一个节点。因此要将tail链接到真正的尾部节点

1.1 实现代码

接下来看一下offer(E e)的实现代码:

    /**
    * Inserts the specified element at the tail of this queue.
    * As the queue is unbounded, this method will never return {@code false}.
    *
    * @return {@code true} (as specified by {@link Queue#offer})
    * @throws NullPointerException if the specified element is null
    */
L1  public boolean offer(E e) {
L2     checkNotNull(e);
L3     final Node<E> newNode = new Node<E>(e);

L4     for (Node<E> t = tail, p = t;;) {
L5         Node<E> q = p.next;
L6         if (q == null) {
               // p is last node
L7             if (p.casNext(null, newNode)) {
                   // Successful CAS is the linearization point
                   // for e to become an element of this queue,
                   // and for newNode to become "live".
L8                 if (p != t) // hop two nodes at a time
L9                     casTail(t, newNode);  // Failure is OK.
L10                return true;
               }
               // Lost CAS race to another thread; re-read next
           }
L11        else if (p == q)
               // We have fallen off list.  If tail is unchanged, it
               // will also be off-list, in which case we need to
               // jump to head, from which all live nodes are always
               // reachable.  Else the new tail is a better bet.
L12            p = (t != (t = tail)) ? t : head; 
L13        else
               // Check for tail updates after two hops.
L14            p = (p != t && t != (t = tail)) ? t : q;
       }
    }

    final void updateHead(Node<E> h, Node<E> p) {
        if (h != p && casHead(h, p))
            h.lazySetNext(h);
    }

代码中t != (t = tail)这样的写法令人难以理解,其实等价于:

{   // 读取t的值并与tail进行比较,比较结果作为返回值
    // 比较完之后更新t的值为tail
    boolean ret = t != tail;
    t = tail;
    return ret;
}

1.2 图示

上面代码中的变量命名有点让人难以理解,这里我们令

  • QTail = tail,表示链表最后一个节点
  • tail = t,表示临时读取出的QTail,多线程环境下执行过程中tail不一定等于QTail
  • ptr = pptr是一个指针,用来寻找链表最后一个节点
  • next = qnext = ptr.next,通过next == null来判断ptr是不是最后一个节点。

Figure 1单线程情况下,队列[1, 2]插入3的描述图,图中左侧行号表示在执行完该行的代码之后,链表的情况。

可以看到由于L8的判断失败,QTail并没有进行更新,因为代码采用了延时更新的策略,累计两个操作之后才会一次性把尾节点移动两步。(QHead也是一样的)

Figure 2单线程情况下,队列[1,2,3]插入4的描述图。

思路比较简单,由于采用了延时更新的策略,需要向右移动ptrL14: ptr = next)才能找到尾节点,这样也就导致了在ptr指向尾节点时ptr != QTail,故需要更新QTailL9)。

多线程环境下L14中的判断可能为真,表示QTail被其它线程更新了,故需要直接调整tail = QTail而不是tail = next。如Figure 3所示。

多线程环境下,更新操作L9可能会失败,因为也许其它线程又在4后面添加了一个新的节点5,并抢先更新了QTail

多线程环境下,节点还可能被删除,删除时会将next指针指向自己。因此L12可能会发生。故需要将ptr指向QTail。当QTail也被删除时,则需要从QHead开始重新寻找尾节点了。

至此,offer(E e)代码的所有分支都已经覆盖

2. poll()

在单线程情况下,在进行出队操作时,要进行以下步骤:

  1. 首先要找到链表的第一个有效节点node(via head)
  2. 将该node的值设为null
  3. 将旧headnext指针指向自己
  4. 更新headnode

在多线程情况下,由于出队操作涉及到上面所述的多个步骤,无法保证在执行过程中其它线程不会对链表进行修改。遇到并发修改时,代码的目的就是重新找到第一个有效节点。

2.1 实现代码

    public E poll() {
L1     restartFromHead:
L2     for (;;) {
L3         for (Node<E> h = head, p = h, q;;) {
L4             E item = p.item;

L5             if (item != null && p.casItem(item, null)) {
                   // Successful CAS is the linearization point
                   // for item to be removed from this queue.
L6                 if (p != h) // hop two nodes at a time
L7                     updateHead(h, ((q = p.next) != null) ? q : p);
L8                 return item;
               }
L9             else if ((q = p.next) == null) {
L10                updateHead(h, p);
L11                return null;
               }
L12            else if (p == q)
L13                continue restartFromHead;
               else
L14                p = q;
           }
       }
    }

2.2 图示

上面代码中的变量命名有点让人难以理解,这里我们令

  • QHead = head,表示链表dummy node或者是第一个有效节点
  • head = t,表示临时读取出的QHead,多线程环境下执行过程中head不一定等于QHead
  • ptr = pptr是一个指针,用来寻找链表第一个有效节点
  • next = qnext = ptr.next,通过next != null来判断ptr是不是第一个有效节点

Figure 4是队列[1,2,3]删除1的图示,可以看到删除之后由于滞后更新策略,QHead一下移动了两个节点,直接指向了第一个有效节点。

Figure 5Figure 4的基础上继续进行poll(),由于滞后更新,QHead并没有移动。

Figure 6Figure 5的基础上继续进行poll(),可以发现QHead跑到QTail后面去了哦。L7的判定结果为假,故没有将QHead移动两步。

继续poll时会执行L10。至于L13,多线程环境下可能QHead以及移到next后面去了。

至此,poll()代码的所有分支都已经覆盖

(完)

References:

  1. ConcurrentLinkedQueue
  2. Michael, Maged M., and Michael L. Scott. "Simple, fast, and practical non-blocking and blocking concurrent queue algorithms." Proceedings of the fifteenth annual ACM symposium on Principles of distributed computing. ACM, 1996.
  3. Compare-and-swap
  4. ConcurrentLinkedQueue 源码分析 (基于Java 8)