一文掌握阻塞队列的使用!

BlockingQueue 继承了 Queue 接口,是队列的一种。

BlockingQueue 是线程安全的:

  • 在很多场景下都可以利用线程安全的队列来优雅地解决业务自身的线程安全问题。

ArrayBlockingQueue

内部是用数组存储元素,利用 ReentrantLock 实现线程安全。

基本特性:

1、有界的阻塞数组,容量一旦创建,后续大小无法修改。

2、元素是有顺序的,按照先入先出进行排序,从队尾插入数据数据,从队头拿数据。

3、队列满时,往队列中 put 数据会被阻塞,队列空时,往队列中拿数据也会被阻塞。

新增数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 新增,如果队列满,无限阻塞
public void put(E e) throws InterruptedException {
// 元素不能为空
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 队列如果是满的,就无限等待
// 一直等待队列中有数据被拿走时,自己被唤醒
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

private void enqueue(E x) {
// assert lock.getHoldCount() == 1; 同一时刻只能一个线程进行操作此方法
// assert items[putIndex] == null;
final Object[] items = this.items;
// putIndex 为本次插入的位置
items[putIndex] = x;
// ++ putIndex 计算下次插入的位置
// 如果下次插入的位置,正好等于队尾,下次插入就从 0 开始
if (++putIndex == items.length)
putIndex = 0;
count++;
// 唤醒因为队列空导致的等待线程
notEmpty.signal();
}

查询数据:

每次拿数据的位置就是 takeIndex 的位置,在找到本次该拿的数据之后:

  • 会把 takeIndex 加 1,计算下次拿数据时的索引位置。

有个特殊情况是:

  • 如果本次拿数据的位置已经是队尾了,那么下次拿数据的位置就要从头开始,就是从 0 开始了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果队列为空,无限等待
// 直到队列中有数据被 put 后,自己被唤醒
while (count == 0)
notEmpty.await();
// 从队列中拿数据
return dequeue();
} finally {
lock.unlock();
}
}

private E dequeue() {
final Object[] items = this.items;
// takeIndex 代表本次拿数据的位置,是上一次拿数据时计算好的
E x = (E) items[takeIndex];
// 帮助 gc
items[takeIndex] = null;
// ++ takeIndex 计算下次拿数据的位置
// 如果正好等于队尾的话,下次就从 0 开始拿数据
if (++takeIndex == items.length)
takeIndex = 0;
// 队列实际大小减 1
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒被队列满所阻塞的线程
notFull.signal();
return x;
}

LinkedBlockingQueue

内部用链表实现的BlockingQueue。

基本特性:

1、它容量默认就为整型的最大值:Integer.MAX_VALUE

2、基于链表的阻塞队列,其底层的数据结构是链表。

3、链表维护先入先出队列,新元素被放在队尾,获取元素从队头部拿。

LinkedBlockingQueue 内部构成,分成三个部分:

  • 链表存储 + 锁 + 迭代器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 链表结构 begin
//链表的元素
static class Node<E> {
E item;

//当前元素的下一个,为空表示当前节点是最后一个
Node<E> next;

Node(E x) { item = x; }
}

//链表的容量,默认 Integer.MAX_VALUE
private final int capacity;

//链表已有元素大小,使用 AtomicInteger,所以是线程安全的
private final AtomicInteger count = new AtomicInteger();

//链表头
transient Node<E> head;

//链表尾
private transient Node<E> last;
// 链表结构 end

// 锁 begin
//take 时的锁
private final ReentrantLock takeLock = new ReentrantLock();

// take 的条件队列,condition 可以简单理解为基于 ASQ 同步机制建立的条件队列
private final Condition notEmpty = takeLock.newCondition();

// put 时的锁,设计两把锁的目的,主要为了 take 和 put 可以同时进行
private final ReentrantLock putLock = new ReentrantLock();

// put 的条件队列
private final Condition notFull = putLock.newCondition();
// 锁 end

// 迭代器
// 实现了自己的迭代器
private class Itr implements Iterator<E> {
………………
}

1、链表的作用是为了保存当前节点,节点中的数据可以是任意东西,是一个泛型。

  • 比如说队列被应用到线程池时,节点就是线程,比如队列被应用到消息队列中,节点就是消息。

2、锁有 take 锁和 put 锁,是为了保证队列操作时的线程安全。

  • 设计两种锁,是为了 takeput 两种操作可以同时进行,互不影响。

阻塞新增:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 把e新增到队列的尾部。
// 如果有可以新增的空间的话,直接新增成功,否则当前线程陷入等待
public void put(E e) throws InterruptedException {
// e 为空,抛出异常
if (e == null) throw new NullPointerException();
// 预先设置 c 为 -1,约定负数为新增失败
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 设置可中断锁
putLock.lockInterruptibly();
try {
// 队列满了
// 当前线程阻塞,等待其他线程的唤醒(其他线程 take 成功后就会唤醒此处被阻塞的线程)
while (count.get() == capacity) {
// await 无限等待
notFull.await();
}

// 队列没有满,直接新增到队列的尾部
enqueue(node);

// 新增计数赋值,注意这里 getAndIncrement 返回的是旧值
// 这里的 c 是比真实的 count 小 1 的
c = count.getAndIncrement();

// 如果链表现在的大小 小于链表的容量,说明队列未满
// 可以尝试唤醒一个 put 的等待线程
if (c + 1 < capacity)
notFull.signal();

} finally {
// 释放锁
putLock.unlock();
}
// c==0,代表队列里面有一个元素
// 会尝试唤醒一个take的等待线程
if (c == 0)
signalNotEmpty();
}
// 入队,把新元素放到队尾
private void enqueue(Node<E> node) {
last = last.next = node;
}

SynchronousQueue

它的容量为 0,没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入。

  • 每次放数据的时候也会阻塞,直到有消费者来取。

基本特性:

在内部抽象出了两种算法实现,一种是先入先出的队列,一种是后入先出的堆栈。

两种算法被两个内部类实现:

  • 队列不存储数据,所以没有大小,也无法迭代。
  • 插入操作的返回必须等待另一个线程完成对应数据的删除操作,反之亦然。
  • 队列由两种数据结构组成,分别是后入先出的堆栈和先入先出的队列,堆栈是非公平的,队列是公平的。

PriorityBlockingQueue

支持优先级的无界阻塞队列,可以通过自定义类实现 compareTo() 方法来指定元素排序规则。

  • 或者初始化时通过构造器参数 Comparator 来指定排序规则。

DelayQueue

底层使用了排序和超时阻塞实现了延迟队列:

  • 排序使用的是 PriorityQueue 排序能力,超时阻塞使用的是锁的等待能力。

DelayQueue 其实就是为了满足延迟执行的场景,在已有 API 的基础上进行了封装。

内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序。

  • 内部采用的是 的数据结构。

基本特性:

1、队列中元素将在过期时被执行,越靠近队头,越早过期。

2、未过期的元素不能够被 take。

3、不允许空元素。