一文掌握阻塞队列的使用!
发表于更新于
BlockingQueue 继承了 Queue
接口,是队列的一种。
BlockingQueue 是线程安全的:
- 在很多场景下都可以利用线程安全的队列来优雅地解决业务自身的线程安全问题。
ArrayBlockingQueue
内部是用数组存储元素,利用 ReentrantLock
实现线程安全。
基本特性:
1、有界的阻塞数组,容量一旦创建,后续大小无法修改。
2、元素是有顺序的,按照先入先出进行排序,从队尾插入数据数据,从队头拿数据。
3、队列满时,往队列中 put
数据会被阻塞,队列空时,往队列中拿数据也会被阻塞。
新增数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
|
查询数据:
每次拿数据的位置就是 takeIndex
的位置,在找到本次该拿的数据之后:
- 会把
takeIndex
加 1,计算下次拿数据时的索引位置。
有个特殊情况是:
- 如果本次拿数据的位置已经是队尾了,那么下次拿数据的位置就要从头开始,就是从 0 开始了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private E dequeue() { final Object[] items = this.items; E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
|
LinkedBlockingQueue
内部用链表实现的BlockingQueue。
基本特性:
1、它容量默认就为整型的最大值:Integer.MAX_VALUE
。
2、基于链表的阻塞队列,其底层的数据结构是链表。
3、链表维护先入先出队列,新元素被放在队尾,获取元素从队头部拿。
LinkedBlockingQueue
内部构成,分成三个部分:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
|
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } }
private final int capacity;
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head;
private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
private class Itr implements Iterator<E> { ……………… }
|
1、链表的作用是为了保存当前节点,节点中的数据可以是任意东西,是一个泛型。
- 比如说队列被应用到线程池时,节点就是线程,比如队列被应用到消息队列中,节点就是消息。
2、锁有 take 锁和 put 锁,是为了保证队列操作时的线程安全。
- 设计两种锁,是为了
take
和 put
两种操作可以同时进行,互不影响。
阻塞新增:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
|
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
private void enqueue(Node<E> node) { last = last.next = node; }
|
SynchronousQueue
它的容量为 0,没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入。
基本特性:
在内部抽象出了两种算法实现,一种是先入先出的队列,一种是后入先出的堆栈。
两种算法被两个内部类实现:
- 队列不存储数据,所以没有大小,也无法迭代。
- 插入操作的返回必须等待另一个线程完成对应数据的删除操作,反之亦然。
- 队列由两种数据结构组成,分别是后入先出的堆栈和先入先出的队列,堆栈是非公平的,队列是公平的。
PriorityBlockingQueue
支持优先级的无界阻塞队列,可以通过自定义类实现 compareTo()
方法来指定元素排序规则。
- 或者初始化时通过构造器参数
Comparator
来指定排序规则。
DelayQueue
底层使用了排序和超时阻塞实现了延迟队列:
- 排序使用的是
PriorityQueue
排序能力,超时阻塞使用的是锁的等待能力。
DelayQueue 其实就是为了满足延迟执行的场景,在已有 API 的基础上进行了封装。
内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序。
基本特性:
1、队列中元素将在过期时被执行,越靠近队头,越早过期。
2、未过期的元素不能够被 take。
3、不允许空元素。