本文介绍并发Queue,有几点说明:
- 其在Java容器中的划分位置见[1]
- 根接口为Queue,Queue接口文档声明其内元素不得为
NULL
(NULL
用作特殊含义,比如“当队列为空时,调用poll()
方法返回NULL
”),故并发Queue内的元素也不得为NULL
- 并发Queue的常见使用场景不太用到迭代器,故本文对迭代器不作深入叙述
- 并发Queue常用于“生产者-消费者模型”,当然“生产者-消费者模型”也可以不用并发Queue,比如还可以用并发List,Disruptor[2]等
接下来主要介绍并发Queue的常见实现类,分为两类:“阻塞队列”和“非阻塞队列”。
这里的“阻塞”有两个维度的语义:
- 实现操作线程安全时是否阻塞
- 是否有“队列满时,添加元素失败阻塞”的“添加元素”阻塞方法
offer(E e, long timeout, TimeUnit unit)/put(E e)
;是否有“队列空时,获取且移除队首元素失败阻塞”的“获取且移除队首元素”阻塞方法poll(long timeout, TimeUnit unit)/take()
“阻塞队列”和“非阻塞队列”在以上两个维度的表现如下表。
并发Queue\语义 | 实现线程安全时是否阻塞 | 是否有offer(E e, long timeout, TimeUnit unit)/put(E e) 阻塞方法;是否有poll(long timeout, TimeUnit unit)/take() 阻塞方法 |
---|---|---|
阻塞队列 | 阻塞 | 实现offer(E e, long timeout, TimeUnit unit)/put(E e) 阻塞方法;实现poll(long timeout, TimeUnit unit)/take() 阻塞方法 |
非阻塞队列 | 不阻塞 | 没有 |
一、阻塞队列
1.1、基本介绍
额外继承实现BlockingQueue接口。
Queue接口主要有“添加元素”,“获取且移除队首元素”和“获取队首元素”三个方法家族,额外继承实现BlockingQueue接口,则引入了分别属于“添加元素”和“获取且移除队首元素”两个方法家族的offer(E e, long timeout, TimeUnit unit)/put(E e)
和poll(long timeout, TimeUnit unit)/take()
方法。
对BlockingQueue接口的三个方法家族进行混淆阐明,具体如下表:
方法\失败处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞(响应中断) | 阻塞指定时间(响应中断),超时退出返回特殊值;否则返回正常值 |
---|---|---|---|---|
添加元素 | add(E e) |
offer(E e) |
put(E e) |
offer(E e,long timeout,TimeUnit unit) |
获取且移除队首元素 | remove() |
poll() |
take() |
poll(long timeout,TimeUnit unit) |
获取队首元素 | element() |
peek() |
不可用 | 不可用 |
备注:
- 注意
remove()
方法和remove(Object o)
方法的区别
1.2、具体实现子类
BlockingQueue接口的常见实现类如下,类继承结构图如图1:
- ArrayBlockingQueue
- LinkedBlockingQueue
- DelayQueue
- DelayedWorkQueue
- PriorityBlockingQueue
- LinkedBlockingDeque
- SynchronousQueue
- LinkedTransferQueue
图1
对接下来的具体实现子类介绍,主要基于以下几个维度:
- 是否有界,分为“设计是否有界”和“实际是否有界”,实际有“设计有界,实际有界”,“设计无界,实际无界”和“设计无界,实际有界,可称为假性无界”这3种情况。设计无界有个特点就是
remainingCapacity()
方法恒返回Integer.MAX_VALUE
- 底层数据结构基于“数组”,还是“链表”,或是“堆(堆的实现最终仍然基于数组/链表)”
- 实现操作的线程安全策略
- 实现“添加元素”和“获取且移除队首元素”阻塞方法的策略
- 生成的迭代器性质[1]
1.2.1、ArrayBlockingQueue
1、核心原理
- 设计有界,实际有界
- 基于循环数组
- 使用
ReentrantLock lock
实现操作的线程安全,即会阻塞,默认是一个“非公平锁”,根据构造参数可显式指定是“公平锁”,还是“非公平锁” - 使用
Condition notFull
和Condition notEmpty
分别实现put(E e)/offer(E e,long timeout,TimeUnit unit)
和take()/poll(long timeout,TimeUnit unit)
阻塞方法的阻塞 - 生成“弱一致性”迭代器
2、构造参数
int capacity
:队列容量boolean fair
:显式指定ReentrantLock lock
对应的锁是“公平锁”还是“非公平锁”,默认是“非公平锁”Collection<? extends E> c
:初始化元素来源集合
1.2.2、LinkedBlockingQueue
1、核心原理
- 设计有界,实际有界
- 基于链表
- 使用
ReentrantLock takeLock
和ReentrantLock putLock
实现操作的线程安全,即会阻塞,默认都是“非公平锁” - 使用
Condition notFull
和Condition notEmpty
分别实现put(E e)/offer(E e,long timeout,TimeUnit unit)
和take()/poll(long timeout,TimeUnit unit)
阻塞方法的阻塞 - 生成“弱一致性”迭代器
2、构造参数
int capacity
:队列容量Collection<? extends E> c
:初始化元素来源集合
1.2.3、DelayQueue
1、核心原理
- 设计无界,实际有界,故是“假性无界”
- 基于
PriorityQueue<E> q = new PriorityQueue<E>()
优先级队列实例,PriorityQueue是假性无界的,故DelayQueue也是假性无界的 - 使用
ReentrantLock lock
实现操作的线程安全,即会阻塞,默认是“非公平锁” - 使用
Condition available
实现take()/poll(long timeout,TimeUnit unit)
阻塞方法的阻塞;假性无界不需要考虑put(E e)/offer(E e,long timeout,TimeUnit unit)
阻塞方法的阻塞 - 生成“快照式”迭代器
2、构造参数
Collection<? extends E> c
:初始化元素来源集合
1.2.4、DelayedWorkQueue
ScheduledThreadPoolExecutor类的静态内部类。
1、核心原理
- 设计无界,实际有界,故是“假性无界”
- 基于数组实现的堆
- 使用
ReentrantLock lock
实现操作的线程安全,即会阻塞,默认是“非公平锁” - 使用
Condition available
实现take()/poll(long timeout,TimeUnit unit)
阻塞方法的阻塞;假性无界不需要考虑put(E e)/offer(E e,long timeout,TimeUnit unit)
阻塞方法的阻塞 - 生成“快照式”迭代器
2、构造参数
无
1.2.5、PriorityBlockingQueue
1、核心原理
- 设计无界,实际有界,故是“假性无界”
- 基于数组实现的堆
- 使用
ReentrantLock lock
实现操作的线程安全,即会阻塞,默认是“非公平锁” - 使用
Condition notEmpty
实现take()/poll(long timeout,TimeUnit unit)
阻塞方法的阻塞;假性无界不需要考虑put(E e)/offer(E e,long timeout,TimeUnit unit)
阻塞方法的阻塞 - 生成“快照式”迭代器
2、构造参数
int initialCapacity
:队列初始容量Comparator<? super E> comparator
:堆内元素比较所基于的比较器Collection<? extends E> c
:初始化元素来源集合
1.2.6、LinkedBlockingDeque
1、核心原理
- 设计有界,实际有界
- 基于链表
- 使用
ReentrantLock lock
实现操作的线程安全,即会阻塞,默认是“非公平锁” - 使用
Condition notFull
和Condition notEmpty
分别实现put(E e)/offer(E e,long timeout,TimeUnit unit)
和take()/poll(long timeout,TimeUnit unit)
阻塞方法的阻塞 - 生成“弱一致性”迭代器
- 另外继承实现
BlockingDeque
接口,对于本文来说不是重点,不作叙述
2、构造参数
int capacity
:队列容量Collection<? extends E> c
:初始化元素来源集合
1.2.7、SynchronousQueue
1、核心原理
- 设计有界,实际有界,容量为0(其实背后的支撑列表设计和实际都为无界)
- 基于链表。但与
LinkedBlockingQueue
和LinkedBlockingDeque
不同,链表节点既包含“元素内容信息”,也包含“操作类型信息” - 使用
CAS自旋 + LockSupport.parkNanos/park
阻塞实现操作的线程安全,可广义认为是CAS自旋阻塞锁
(与[3]中的CAS自旋锁
不同,后者没有阻塞,两者的相同之处在于都是广义的锁),即会阻塞,此时无谓“公平锁还是非公平锁” - 使用
LockSupport.parkNanos/park
实现take()/poll(long timeout,TimeUnit unit)
阻塞方法的阻塞;使用LockSupport.parkNanos/park
实现put(E e)/offer(E e,long timeout,TimeUnit unit)
阻塞方法的阻塞 - 生成“空”迭代器
- 几个方法的行为见下表
大方法 | 小方法 | 描述 |
---|---|---|
添加元素 | ||
add(E e) | 如果在链表中正好有一个表征“获取且移除队首元素”操作的节点,两两匹配,添加元素成功,返回true;否则,添加元素失败,抛出异常 | |
offer(E e) | 如果在链表中正好有一个表征“获取且移除队首元素”操作的节点,两两匹配,添加元素成功,返回true;否则,添加元素失败,返回false | |
put(E e) | 如果在链表中正好有一个表征“获取且移除队首元素”操作的节点,两两匹配,添加元素成功,返回;否则,往链表中添加一个表征“添加元素”操作的节点,阻塞直到被两两匹配 | |
offer(E e,long timeout,TimeUnit unit) | 如果在链表中正好有一个表征“获取且移除队首元素”操作的节点,两两匹配,添加元素成功,返回true;否则,往链表中添加一个表征“添加元素”操作的节点,阻塞指定时间,最终结果是:成功被两两匹配,添加元素成功,返回true;超时返回,添加元素失败,返回false | |
获取且移除队首元素 | ||
remove() | 如果在链表中正好有一个表征“添加元素”操作的节点,两两匹配,获取且移除队首元素成功,返回队首元素,即表征“添加元素”操作节点对应的元素;否则,获取且移除队首元素失败,抛出异常 | |
poll() | 如果在链表中正好有一个表征“添加元素”操作的节点,两两匹配,获取且移除队首元素成功,返回队首元素,即表征“添加元素”操作节点对应的元素;否则,获取且移除队首元素失败,返回NULL | |
take() | 如果在链表中正好有一个表征“添加元素”操作的节点,两两匹配,获取且移除队首元素成功,返回队首元素,即表征“添加元素”操作节点对应的元素;否则,往链表中添加一个表征“获取且移除队首元素”操作的节点,阻塞直到被两两匹配,被唤醒后返回匹配的队首元素 | |
poll(long timeout,TimeUnit unit) | 如果在链表中正好有一个表征“添加元素”操作的节点,两两匹配,获取且移除队首元素成功,返回队首元素,即表征“添加元素”操作节点对应的元素;否则,往链表中添加一个表征“获取且移除队首元素”操作的节点,阻塞指定时间,最终结果是:成功被两两匹配,被唤醒后返回匹配的队首元素;超时返回,获取且移除队首元素失败,返回NULL | |
获取队首元素 | ||
element() | 抛出异常 | |
peek() | 返回NULL | |
其他 | ||
remove(Object o) | 返回false,表示不存在这个元素,删除失败 | |
size() | 返回0 | |
remainingCapacity() | 返回0 | |
iterator() | 返回“空”迭代器 |
2、构造参数
boolean fair
:如果为true,表征“添加元素”/“获取且移除队首元素”操作的节点操作顺序为“先进先出FIFO”,对应于源码中的TransferQueue数据结构;否则,节点操作顺序为“后进先出LIFO”,对应于源码中的TransferStack数据结构。需要注意的是,在源码中有提及TransferStack对应的顺序不定,怎么理解这个矛盾呢?其实本质上是讨论的范畴不同,有“节点操作”和“节点匹配”两个范畴,TransferQueue和TransferStack在两个范畴下的顺序描述如下表
数据结构\范畴 | 节点操作 | 节点匹配 |
---|---|---|
TransferQueue | FIFO | FIFO |
TransferStack | LIFO | 不定,比如“PUT-A PUT-B GET(跟PUT-B匹配) PUT-C GET(跟PUT-C匹配) GET(跟PUT-A匹配) 匹配流并不是后入先匹配” |
1.2.8、LinkedTransferQueue
1、核心原理
- 设计无界,实际无界
- 基于链表。但与
LinkedBlockingQueue
和LinkedBlockingDeque
不同,链表节点既包含“元素内容信息”,也包含“操作类型信息” - 使用
CAS自旋 + LockSupport.parkNanos/park
阻塞实现操作的线程安全,可广义认为是CAS自旋阻塞锁
(与[3]中的CAS自旋锁
不同,后者没有阻塞,两者的相同之处在于都是广义的锁),即会阻塞,此时无谓“公平锁还是非公平锁” - 使用
LockSupport.parkNanos/park
实现take()/poll(long timeout,TimeUnit unit)
阻塞方法的阻塞;无界不需要考虑put(E e)/offer(E e,long timeout,TimeUnit unit)
阻塞方法的阻塞 - 生成“弱一致性”迭代器
- 由于是无界队列,调用
add(E e)/offer(E e)/put(E e)/offer(E e,long timeout,TimeUnit unit)
方法都会立即成功,另外有3个继承自TransferQueue
接口的“添加元素”方法,这3个方法的行为与SynchronousQueue中“添加元素”方法的行为类似:transfer(E e)
,如果在链表中正好有一个表征“获取且移除队首元素”操作的节点,两两匹配,添加元素成功;否则,往链表中添加一个表征“添加元素”操作的节点,阻塞直到被两两匹配tryTransfer(E e)
,如果在链表中正好有一个表征“获取且移除队首元素”操作的节点,两两匹配,添加元素成功,返回true;否则,添加元素失败,返回falsetryTransfer(E e, long timeout, TimeUnit unit)
,如果在链表中正好有一个表征“获取且移除队首元素”操作的节点,两两匹配,添加元素成功,返回true;否则,往链表中添加一个表征“添加元素”操作的节点,阻塞指定时间,最终结果是:成功被两两匹配,添加元素成功,返回true;超时返回,添加元素失败,返回false
2、构造参数
Collection<? extends E> c
:初始化元素来源集合
1.3、总结
1.3.1、方法的时间复杂度分析
在时间复杂度分析中,对于ReentrantLock锁的获取操作,可予以忽略而进行正常分析,而对于CAS自旋阻塞锁的获取操作,难以忽略而不可分析。
方法家族 | 方法 | ArrayBlockingQueue | LinkedBlockingQueue | DelayQueue | DelayedWorkQueue | PriorityBlockingQueue | LinkedBlockingDeque | SynchronousQueue | LinkedTransferQueue |
---|---|---|---|---|---|---|---|---|---|
添加元素 | |||||||||
add(E e) | O(1) | O(1) | O(logN) | O(logN) | O(logN) | O(1) | / | / | |
offer(E e) | O(1) | O(1) | O(logN) | O(logN) | O(logN) | O(1) | / | / | |
put(E e) | O(1) | O(1) | O(logN) | O(logN) | O(logN) | O(1) | / | / | |
offer(E e,long timeout,TimeUnit unit) | O(1) | O(1) | O(logN) | O(logN) | O(logN) | O(1) | / | / | |
获取且移除队首元素 | |||||||||
remove() | O(1) | O(1) | O(logN) | O(logN) | O(logN) | O(1) | / | / | |
poll() | O(1) | O(1) | O(logN) | O(logN) | O(logN) | O(1) | / | / | |
take() | O(1) | O(1) | O(logN) | O(logN) | O(logN) | O(1) | / | / | |
poll(long timeout,TimeUnit unit) | O(1) | O(1) | O(logN) | O(logN) | O(logN) | O(1) | / | / | |
获取队首元素 | |||||||||
element() | O(1) | O(1) | O(1) | O(1) | O(1) | O(1) | O(1) | O(N) | |
peek() | O(1) | O(1) | O(1) | O(1) | O(1) | O(1) | O(1) | O(N) | |
其他 | |||||||||
remove(Object o) | O(N) | O(N) | O(N) | O(N) | O(N) | O(N) | O(1) | O(N) | |
size() | O(1) | O(1) | O(1) | O(1) | O(1) | O(1) | O(1) | O(N) | |
remainingCapacity() | O(1) | O(1) | O(1) | O(1) | O(1) | O(1) | O(1) | O(1) | |
iterator() | O(1),弱一致性 | O(1),弱一致性 | O(N),快照式 | O(N),快照式 | O(N),快照式 | O(1),弱一致性 | O(1),空 | O(1),弱一致性 |
1.3.2、常见选型
1、ArrayBlockingQueue vs LinkedBlockingQueue
跟选型有关的主要差异有两点:
- 并发性能。前者对于“添加元素”、“获取且移除队首元素”和“获取队首元素”等操作使用同一把锁,后者对于“添加元素”、“获取且移除队首元素”和“获取队首元素”等操作使用独立的两把锁,后者的并发性能好
- 前者提前分配好数组的固定内存,后者按需动态申请链表节点内存,当然元素对象本身的内存都是动态分配的
二、非阻塞队列
2.1、基本介绍
不额外继承实现BlockingQueue接口。
Queue接口主要有“添加元素”,“获取且移除队首元素”和“获取队首元素”三个方法家族。
对Queue接口的三个方法家族进行混淆阐明,具体如下表:
方法\失败处理方式 | 抛出异常 | 返回特殊值 |
---|---|---|
添加元素 | add(E e) |
offer(E e) |
获取且移除队首元素 | remove() |
poll() |
获取队首元素 | element() |
peek() |
备注:
- 注意
remove()
方法和remove(Object o)
方法的区别
2.2、具体实现子类
Queue接口的并发非阻塞常见实现类如下,类继承结构图如图2:
- ConcurrentLinkedDeque
- ConcurrentLinkedQueue
图2
对接下来的具体实现子类介绍,主要基于以下几个维度:
- 是否有界,分为“设计是否有界”和“实际是否有界”,实际有“设计有界,实际有界”,“设计无界,实际无界”和“设计无界,实际有界,可称之为假性无界”这3种情况。设计无界有个特点就是
remainingCapacity()
方法恒返回Integer.MAX_VALUE
,假性无界也是这个返回值 - 底层数据结构基于“数组”,还是“链表”,或是“堆(堆的实现最终仍然基于数组/链表)”
- 实现操作线程安全的策略
- 实现“添加元素”和“获取且移除队首元素”阻塞方法的策略
- 生成的迭代器性质[1]
2.2.1、ConcurrentLinkedQueue
1、核心原理
- 设计无界,实际无界
- 基于链表
- 使用CAS自旋锁(注意是纯粹的CAS自旋锁,没有阻塞,跟上述的CAS自旋阻塞锁不一样)实现操作的线程安全,即不会阻塞,此时无谓“公平锁还是非公平锁”
- 没有“添加元素”和“获取且移除队首元素”的阻塞方法
- 生成“弱一致性”迭代器
2、构造参数
Collection<? extends E> c
:初始化元素来源集合
2.2.2、ConcurrentLinkedDeque
1、核心原理
- 设计无界,实际无界
- 基于链表
- 使用CAS自旋锁(注意是纯粹的CAS自旋锁,没有阻塞,跟上述的CAS自旋阻塞锁不一样)实现操作的线程安全,即不会阻塞,此时无谓“公平锁还是非公平锁”
- 没有“添加元素”和“获取且移除队首元素”的阻塞方法
- 生成“弱一致性”迭代器
- 另外继承实现
Deque
接口,对于本文来说不是重点,不作叙述
2、构造参数
Collection<? extends E> c
:初始化元素来源集合
2.3、总结
2.3.1、方法的时间复杂度分析
在时间复杂度分析中,对于CAS自旋锁的获取操作,难以忽略而不可分析。
大方法 | 小方法 | ConcurrentLinkedQueue | ConcurrentLinkedDeque |
---|---|---|---|
添加元素 | |||
add(E e) | / | / | |
offer(E e) | / | / | |
获取且移除队首元素 | |||
remove() | / | / | |
poll() | / | / | |
获取队首元素 | |||
element() | / | / | |
peek() | / | / | |
其他 | |||
remove(Object o) | / | / | |
size() | / | / | |
iterator() | O(1),弱一致性 | O(1),弱一致性 |
参考文献
[1]《Java容器》
[2]https://tech.meituan.com/2016/11/18/disruptor.html
[3]《原子操作与锁》