0%

并发Queue

本文介绍并发Queue,有几点说明:

  • 其在Java容器中的划分位置见[1]
  • 根接口为Queue,Queue接口文档声明其内元素不得为NULLNULL用作特殊含义,比如“当队列为空时,调用poll()方法返回NULL”),故并发Queue内的元素也不得为NULL
  • 并发Queue的常见使用场景不太用到迭代器,故本文对迭代器不作深入叙述
  • 并发Queue常用于“生产者-消费者模型”,当然“生产者-消费者模型”也可以不用并发Queue,比如还可以用并发List,Disruptor[2]等

接下来主要介绍并发Queue的常见实现类,分为两类:“阻塞队列”和“非阻塞队列”。

这里的“阻塞”有两个维度的语义:

  • 实现操作线程安全时是否阻塞
  • 是否有“队列满时,添加元素失败阻塞”的“添加元素”阻塞方法offer(E e, long timeout, TimeUnit unit)/put(E e);是否有“队列空时,获取且移除队首元素失败阻塞”的“获取且移除队首元素”阻塞方法poll(long timeout, TimeUnit unit)/take()

“阻塞队列”和“非阻塞队列”在以上两个维度的表现如下表。

并发Queue\语义 实现线程安全时是否阻塞 是否有offer(E e, long timeout, TimeUnit unit)/put(E e)阻塞方法;是否有poll(long timeout, TimeUnit unit)/take()阻塞方法
阻塞队列 阻塞 实现offer(E e, long timeout, TimeUnit unit)/put(E e)阻塞方法;实现poll(long timeout, TimeUnit unit)/take()阻塞方法
非阻塞队列 不阻塞 没有

一、阻塞队列

1.1、基本介绍

额外继承实现BlockingQueue接口。

Queue接口主要有“添加元素”,“获取且移除队首元素”和“获取队首元素”三个方法家族,额外继承实现BlockingQueue接口,则引入了分别属于“添加元素”和“获取且移除队首元素”两个方法家族的offer(E e, long timeout, TimeUnit unit)/put(E e)poll(long timeout, TimeUnit unit)/take()方法。

对BlockingQueue接口的三个方法家族进行混淆阐明,具体如下表:

方法\失败处理方式 抛出异常 返回特殊值 一直阻塞(响应中断) 阻塞指定时间(响应中断),超时退出返回特殊值;否则返回正常值
添加元素 add(E e) offer(E e) put(E e) offer(E e,long timeout,TimeUnit unit)
获取且移除队首元素 remove() poll() take() poll(long timeout,TimeUnit unit)
获取队首元素 element() peek() 不可用 不可用

备注

  • 注意remove()方法和remove(Object o)方法的区别

1.2、具体实现子类

BlockingQueue接口的常见实现类如下,类继承结构图如图1:

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • DelayQueue
  • DelayedWorkQueue
  • PriorityBlockingQueue
  • LinkedBlockingDeque
  • SynchronousQueue
  • LinkedTransferQueue

图1


对接下来的具体实现子类介绍,主要基于以下几个维度:

  • 是否有界,分为“设计是否有界”和“实际是否有界”,实际有“设计有界,实际有界”,“设计无界,实际无界”和“设计无界,实际有界,可称为假性无界”这3种情况。设计无界有个特点就是remainingCapacity()方法恒返回Integer.MAX_VALUE
  • 底层数据结构基于“数组”,还是“链表”,或是“堆(堆的实现最终仍然基于数组/链表)”
  • 实现操作的线程安全策略
  • 实现“添加元素”和“获取且移除队首元素”阻塞方法的策略
  • 生成的迭代器性质[1]

1.2.1、ArrayBlockingQueue

1、核心原理

  • 设计有界,实际有界
  • 基于循环数组
  • 使用ReentrantLock lock实现操作的线程安全,即会阻塞,默认是一个“非公平锁”,根据构造参数可显式指定是“公平锁”,还是“非公平锁”
  • 使用Condition notFullCondition notEmpty分别实现put(E e)/offer(E e,long timeout,TimeUnit unit)take()/poll(long timeout,TimeUnit unit)阻塞方法的阻塞
  • 生成“弱一致性”迭代器

2、构造参数

  • int capacity:队列容量
  • boolean fair:显式指定ReentrantLock lock对应的锁是“公平锁”还是“非公平锁”,默认是“非公平锁”
  • Collection<? extends E> c:初始化元素来源集合

1.2.2、LinkedBlockingQueue

1、核心原理

  • 设计有界,实际有界
  • 基于链表
  • 使用ReentrantLock takeLockReentrantLock putLock实现操作的线程安全,即会阻塞,默认都是“非公平锁”
  • 使用Condition notFullCondition notEmpty分别实现put(E e)/offer(E e,long timeout,TimeUnit unit)take()/poll(long timeout,TimeUnit unit)阻塞方法的阻塞
  • 生成“弱一致性”迭代器

2、构造参数

  • int capacity:队列容量
  • Collection<? extends E> c:初始化元素来源集合

1.2.3、DelayQueue

1、核心原理

  • 设计无界,实际有界,故是“假性无界”
  • 基于PriorityQueue<E> q = new PriorityQueue<E>()优先级队列实例,PriorityQueue是假性无界的,故DelayQueue也是假性无界的
  • 使用ReentrantLock lock实现操作的线程安全,即会阻塞,默认是“非公平锁”
  • 使用Condition available实现take()/poll(long timeout,TimeUnit unit)阻塞方法的阻塞;假性无界不需要考虑put(E e)/offer(E e,long timeout,TimeUnit unit)阻塞方法的阻塞
  • 生成“快照式”迭代器

2、构造参数

  • Collection<? extends E> c:初始化元素来源集合

1.2.4、DelayedWorkQueue

ScheduledThreadPoolExecutor类的静态内部类。

1、核心原理

  • 设计无界,实际有界,故是“假性无界”
  • 基于数组实现的堆
  • 使用ReentrantLock lock实现操作的线程安全,即会阻塞,默认是“非公平锁”
  • 使用Condition available实现take()/poll(long timeout,TimeUnit unit)阻塞方法的阻塞;假性无界不需要考虑put(E e)/offer(E e,long timeout,TimeUnit unit)阻塞方法的阻塞
  • 生成“快照式”迭代器

2、构造参数

1.2.5、PriorityBlockingQueue

1、核心原理

  • 设计无界,实际有界,故是“假性无界”
  • 基于数组实现的堆
  • 使用ReentrantLock lock实现操作的线程安全,即会阻塞,默认是“非公平锁”
  • 使用Condition notEmpty实现take()/poll(long timeout,TimeUnit unit)阻塞方法的阻塞;假性无界不需要考虑put(E e)/offer(E e,long timeout,TimeUnit unit)阻塞方法的阻塞
  • 生成“快照式”迭代器

2、构造参数

  • int initialCapacity:队列初始容量
  • Comparator<? super E> comparator:堆内元素比较所基于的比较器
  • Collection<? extends E> c:初始化元素来源集合

1.2.6、LinkedBlockingDeque

1、核心原理

  • 设计有界,实际有界
  • 基于链表
  • 使用ReentrantLock lock实现操作的线程安全,即会阻塞,默认是“非公平锁”
  • 使用Condition notFullCondition notEmpty分别实现put(E e)/offer(E e,long timeout,TimeUnit unit)take()/poll(long timeout,TimeUnit unit)阻塞方法的阻塞
  • 生成“弱一致性”迭代器
  • 另外继承实现BlockingDeque接口,对于本文来说不是重点,不作叙述

2、构造参数

  • int capacity:队列容量
  • Collection<? extends E> c:初始化元素来源集合

1.2.7、SynchronousQueue

1、核心原理

  • 设计有界,实际有界,容量为0(其实背后的支撑列表设计和实际都为无界)
  • 基于链表。但与LinkedBlockingQueueLinkedBlockingDeque不同,链表节点既包含“元素内容信息”,也包含“操作类型信息”
  • 使用CAS自旋 + LockSupport.parkNanos/park阻塞实现操作的线程安全,可广义认为是CAS自旋阻塞锁(与[3]中的CAS自旋锁不同,后者没有阻塞,两者的相同之处在于都是广义的锁),即会阻塞,此时无谓“公平锁还是非公平锁”
  • 使用LockSupport.parkNanos/park实现take()/poll(long timeout,TimeUnit unit)阻塞方法的阻塞;使用LockSupport.parkNanos/park实现put(E e)/offer(E e,long timeout,TimeUnit unit)阻塞方法的阻塞
  • 生成“空”迭代器
  • 几个方法的行为见下表
大方法小方法描述
添加元素
add(E e)如果在链表中正好有一个表征“获取且移除队首元素”操作的节点,两两匹配,添加元素成功,返回true;否则,添加元素失败,抛出异常
offer(E e)如果在链表中正好有一个表征“获取且移除队首元素”操作的节点,两两匹配,添加元素成功,返回true;否则,添加元素失败,返回false
put(E e)如果在链表中正好有一个表征“获取且移除队首元素”操作的节点,两两匹配,添加元素成功,返回;否则,往链表中添加一个表征“添加元素”操作的节点,阻塞直到被两两匹配
offer(E e,long timeout,TimeUnit unit)如果在链表中正好有一个表征“获取且移除队首元素”操作的节点,两两匹配,添加元素成功,返回true;否则,往链表中添加一个表征“添加元素”操作的节点,阻塞指定时间,最终结果是:成功被两两匹配,添加元素成功,返回true;超时返回,添加元素失败,返回false
获取且移除队首元素
remove()如果在链表中正好有一个表征“添加元素”操作的节点,两两匹配,获取且移除队首元素成功,返回队首元素,即表征“添加元素”操作节点对应的元素;否则,获取且移除队首元素失败,抛出异常
poll()如果在链表中正好有一个表征“添加元素”操作的节点,两两匹配,获取且移除队首元素成功,返回队首元素,即表征“添加元素”操作节点对应的元素;否则,获取且移除队首元素失败,返回NULL
take()如果在链表中正好有一个表征“添加元素”操作的节点,两两匹配,获取且移除队首元素成功,返回队首元素,即表征“添加元素”操作节点对应的元素;否则,往链表中添加一个表征“获取且移除队首元素”操作的节点,阻塞直到被两两匹配,被唤醒后返回匹配的队首元素
poll(long timeout,TimeUnit unit)如果在链表中正好有一个表征“添加元素”操作的节点,两两匹配,获取且移除队首元素成功,返回队首元素,即表征“添加元素”操作节点对应的元素;否则,往链表中添加一个表征“获取且移除队首元素”操作的节点,阻塞指定时间,最终结果是:成功被两两匹配,被唤醒后返回匹配的队首元素;超时返回,获取且移除队首元素失败,返回NULL
获取队首元素
element()抛出异常
peek()返回NULL
其他
remove(Object o)返回false,表示不存在这个元素,删除失败
size()返回0
remainingCapacity()返回0
iterator()返回“空”迭代器

2、构造参数

  • boolean fair:如果为true,表征“添加元素”/“获取且移除队首元素”操作的节点操作顺序为“先进先出FIFO”,对应于源码中的TransferQueue数据结构;否则,节点操作顺序为“后进先出LIFO”,对应于源码中的TransferStack数据结构。需要注意的是,在源码中有提及TransferStack对应的顺序不定,怎么理解这个矛盾呢?其实本质上是讨论的范畴不同,有“节点操作”和“节点匹配”两个范畴,TransferQueue和TransferStack在两个范畴下的顺序描述如下表
数据结构\范畴 节点操作 节点匹配
TransferQueue FIFO FIFO
TransferStack LIFO 不定,比如“PUT-A PUT-B GET(跟PUT-B匹配) PUT-C GET(跟PUT-C匹配) GET(跟PUT-A匹配)匹配流并不是后入先匹配”

1.2.8、LinkedTransferQueue

1、核心原理

  • 设计无界,实际无界
  • 基于链表。但与LinkedBlockingQueueLinkedBlockingDeque不同,链表节点既包含“元素内容信息”,也包含“操作类型信息”
  • 使用CAS自旋 + LockSupport.parkNanos/park阻塞实现操作的线程安全,可广义认为是CAS自旋阻塞锁(与[3]中的CAS自旋锁不同,后者没有阻塞,两者的相同之处在于都是广义的锁),即会阻塞,此时无谓“公平锁还是非公平锁”
  • 使用LockSupport.parkNanos/park实现take()/poll(long timeout,TimeUnit unit)阻塞方法的阻塞;无界不需要考虑put(E e)/offer(E e,long timeout,TimeUnit unit)阻塞方法的阻塞
  • 生成“弱一致性”迭代器
  • 由于是无界队列,调用add(E e)/offer(E e)/put(E e)/offer(E e,long timeout,TimeUnit unit)方法都会立即成功,另外有3个继承自TransferQueue接口的“添加元素”方法,这3个方法的行为与SynchronousQueue中“添加元素”方法的行为类似:
    • transfer(E e),如果在链表中正好有一个表征“获取且移除队首元素”操作的节点,两两匹配,添加元素成功;否则,往链表中添加一个表征“添加元素”操作的节点,阻塞直到被两两匹配
    • tryTransfer(E e),如果在链表中正好有一个表征“获取且移除队首元素”操作的节点,两两匹配,添加元素成功,返回true;否则,添加元素失败,返回false
    • tryTransfer(E e, long timeout, TimeUnit unit),如果在链表中正好有一个表征“获取且移除队首元素”操作的节点,两两匹配,添加元素成功,返回true;否则,往链表中添加一个表征“添加元素”操作的节点,阻塞指定时间,最终结果是:成功被两两匹配,添加元素成功,返回true;超时返回,添加元素失败,返回false

2、构造参数

  • Collection<? extends E> c:初始化元素来源集合

1.3、总结

1.3.1、方法的时间复杂度分析

在时间复杂度分析中,对于ReentrantLock锁的获取操作,可予以忽略而进行正常分析,而对于CAS自旋阻塞锁的获取操作,难以忽略而不可分析。

方法家族 方法 ArrayBlockingQueue LinkedBlockingQueue DelayQueue DelayedWorkQueue PriorityBlockingQueue LinkedBlockingDeque SynchronousQueue LinkedTransferQueue
添加元素
add(E e) O(1) O(1) O(logN) O(logN) O(logN) O(1) / /
offer(E e) O(1) O(1) O(logN) O(logN) O(logN) O(1) / /
put(E e) O(1) O(1) O(logN) O(logN) O(logN) O(1) / /
offer(E e,long timeout,TimeUnit unit) O(1) O(1) O(logN) O(logN) O(logN) O(1) / /
获取且移除队首元素
remove() O(1) O(1) O(logN) O(logN) O(logN) O(1) / /
poll() O(1) O(1) O(logN) O(logN) O(logN) O(1) / /
take() O(1) O(1) O(logN) O(logN) O(logN) O(1) / /
poll(long timeout,TimeUnit unit) O(1) O(1) O(logN) O(logN) O(logN) O(1) / /
获取队首元素
element() O(1) O(1) O(1) O(1) O(1) O(1) O(1) O(N)
peek() O(1) O(1) O(1) O(1) O(1) O(1) O(1) O(N)
其他
remove(Object o) O(N) O(N) O(N) O(N) O(N) O(N) O(1) O(N)
size() O(1) O(1) O(1) O(1) O(1) O(1) O(1) O(N)
remainingCapacity() O(1) O(1) O(1) O(1) O(1) O(1) O(1) O(1)
iterator() O(1),弱一致性 O(1),弱一致性 O(N),快照式 O(N),快照式 O(N),快照式 O(1),弱一致性 O(1),空 O(1),弱一致性

1.3.2、常见选型

1、ArrayBlockingQueue vs LinkedBlockingQueue
跟选型有关的主要差异有两点:

  • 并发性能。前者对于“添加元素”、“获取且移除队首元素”和“获取队首元素”等操作使用同一把锁,后者对于“添加元素”、“获取且移除队首元素”和“获取队首元素”等操作使用独立的两把锁,后者的并发性能好
  • 前者提前分配好数组的固定内存,后者按需动态申请链表节点内存,当然元素对象本身的内存都是动态分配的

二、非阻塞队列

2.1、基本介绍

不额外继承实现BlockingQueue接口。

Queue接口主要有“添加元素”,“获取且移除队首元素”和“获取队首元素”三个方法家族。

对Queue接口的三个方法家族进行混淆阐明,具体如下表:

方法\失败处理方式 抛出异常 返回特殊值
添加元素 add(E e) offer(E e)
获取且移除队首元素 remove() poll()
获取队首元素 element() peek()

备注

  • 注意remove()方法和remove(Object o)方法的区别

2.2、具体实现子类

Queue接口的并发非阻塞常见实现类如下,类继承结构图如图2:

  • ConcurrentLinkedDeque
  • ConcurrentLinkedQueue

图2

对接下来的具体实现子类介绍,主要基于以下几个维度:

  • 是否有界,分为“设计是否有界”和“实际是否有界”,实际有“设计有界,实际有界”,“设计无界,实际无界”和“设计无界,实际有界,可称之为假性无界”这3种情况。设计无界有个特点就是remainingCapacity()方法恒返回Integer.MAX_VALUE,假性无界也是这个返回值
  • 底层数据结构基于“数组”,还是“链表”,或是“堆(堆的实现最终仍然基于数组/链表)”
  • 实现操作线程安全的策略
  • 实现“添加元素”和“获取且移除队首元素”阻塞方法的策略
  • 生成的迭代器性质[1]

2.2.1、ConcurrentLinkedQueue

1、核心原理

  • 设计无界,实际无界
  • 基于链表
  • 使用CAS自旋锁(注意是纯粹的CAS自旋锁,没有阻塞,跟上述的CAS自旋阻塞锁不一样)实现操作的线程安全,即不会阻塞,此时无谓“公平锁还是非公平锁”
  • 没有“添加元素”和“获取且移除队首元素”的阻塞方法
  • 生成“弱一致性”迭代器

2、构造参数

  • Collection<? extends E> c:初始化元素来源集合

2.2.2、ConcurrentLinkedDeque

1、核心原理

  • 设计无界,实际无界
  • 基于链表
  • 使用CAS自旋锁(注意是纯粹的CAS自旋锁,没有阻塞,跟上述的CAS自旋阻塞锁不一样)实现操作的线程安全,即不会阻塞,此时无谓“公平锁还是非公平锁”
  • 没有“添加元素”和“获取且移除队首元素”的阻塞方法
  • 生成“弱一致性”迭代器
  • 另外继承实现Deque接口,对于本文来说不是重点,不作叙述

2、构造参数

  • Collection<? extends E> c:初始化元素来源集合

2.3、总结

2.3.1、方法的时间复杂度分析

在时间复杂度分析中,对于CAS自旋锁的获取操作,难以忽略而不可分析。

大方法小方法 ConcurrentLinkedQueue ConcurrentLinkedDeque
添加元素
add(E e) / /
offer(E e) / /
获取且移除队首元素
remove()//
poll() / /
获取队首元素
element() / /
peek() / /
其他
remove(Object o) / /
size() / /
iterator() O(1),弱一致性 O(1),弱一致性

参考文献

[1]《Java容器》
[2]https://tech.meituan.com/2016/11/18/disruptor.html
[3]《原子操作与锁》

您的支持将鼓励我继续分享!