并发编程

月伴飞鱼 2024-10-24 14:15:26
基础知识
支付宝打赏 微信打赏

如果文章对你有帮助,欢迎点击上方按钮打赏作者!

线程

线程优先级

线程的优先级:线程抢占 CPU 时间片的概率,优先级越高的线程优先执行的概率就越大,但并不能保证优先级高的线程一定先执行。

// 线程可以拥有的最小优先级
public final static int MIN_PRIORITY = 1;

// 线程默认优先级
public final static int NORM_PRIORITY = 5;

// 线程可以拥有的最大优先级
public final static int MAX_PRIORITY = 10

线程的常用方法

join():

在一个线程中调用 other.join() ,这时候当前线程会让出执行权给 other 线程,直到 other 线程执行完或者过了超时时间之后再继续执行当前线程。

yield():

给线程调度器一个当前线程愿意出让 CPU 使用权的暗示,但是线程调度器可能会忽略这个暗示。

  • yield() 执行非常不稳定,线程调度器不一定会采纳 yield() 出让 CPU 使用权的建议。

wait/notify和sleep方法的异同?

wait 方法必须在 synchronized 保护的代码中使用,而 sleep 方法并没有这个要求。

在同步代码中执行 sleep 方法时,并不会释放 monitor 锁,但执行 wait 方法时会主动释放 monitor 锁。

sleep 方法中会要求必须定义一个时间,时间到期后会主动恢复,而对于没有参数的 wait 方法而言,意味着永久等待,直到被中断或被唤醒才能恢复,它并不会主动恢复。

wait/notify 是 Object 类的方法,而 sleep 是 Thread 类的方法。

线程状态

新建状态(New):

通过实现Runnable接口或继承Thread声明一个线程类,new一个实例后,线程就进入了新建状态。

就绪状态(Ready):

线程创建成功后,调用该线程的start()函数,线程进入就绪状态,该状态的线程进入可运行线程池中,等待获取CPU的使用权。

运行状态(Running):

可运行线程池中选择一个线程,该线程进入运行状态。

  • 线程获取到了CPU时间片。
  • 当线程时间片用完或调用的yield()函数,该线程回到就绪状态。

终止状态(Terminated):

线程执行结束或执行过程中因异常意外终止都会使线程进入终止状态。

等待状态(Waiting):

运行状态的线程执行wait()、join()、LockSupport.park()任意函数,该线程进入等待状态。

  • 其中wait()join()函数会让JVM把该线程放入锁等待队列。

  • 处于这种状态的线程不会被分配CPU执行时间,它们要等待被主动唤醒,否则会一直处于等待状态。

  • 执行LockSupport.unpark(t)函数唤醒指定线程,该线程回到就绪状态。

  • 通过notify()、notifyAll()、join线程执行完毕方式,会唤醒锁等待队列的线程,出队的线程回到就绪状态。

超时等待状态(Timed waiting):

超时等待与等待状态一样,唯一的区别就是多了超时机制,不会一直等待被其他线程主动唤醒,而是到达指定时间后会自动唤醒

阻塞状态(Blocked):

运行状态的线程获取同步锁失败或发出I/O请求,该线程进入阻塞状态。

  • 如果是获取同步锁失败JVM还会把该线程放入锁的同步队列。
  • 同步锁被释放时,锁的同步队列会出队所有线程,进入就绪状态。
  • I/O处理完毕时,该线程重新回到就绪状态。

线程安全

多个线程访问一个对象时,如果不用考虑这些线程在运行时环境下的调度和交替执行问题,也不需要进行额外的同步,而调用这个对象的行为都可以获得正确的结果,那这个对象便是线程安全的。

什么场景需要注意线程安全问题?

访问共享变量或资源:

有访问共享对象的属性,访问 static 静态变量,访问共享的缓存等等。

因为这些信息不仅会被一个线程访问到,还有可能被多个线程同时访问,那么就有可能在并发读写的情况下发生线程安全问题。

依赖时序的操作:

如果我们操作的正确性是依赖时序的,而在多线程的情况下又不能保障执行的顺序和我们预想的一致,这个时候就会发生线程安全问题。

if (map.containsKey(key)) {
    map.remove(obj)
}

互斥与同步的区别

互斥:

  • 某一资源同时只允许一个访问者对其进行访问,具有唯一性和排它性。

同步:

  • 实现访问者对资源的有序访问。

停止线程

stop()

直接把线程停止,这样就没有给线程足够的时间来处理想要在停止前保存数据的逻辑,任务戛然而止。

  • 会导致出现数据完整性等问题。

suspend()resume()

如果线程调用 suspend(),它并不会释放锁,就开始进入休眠,但此时有可能仍持有锁,这样就容易导致死锁问题。

  • 因为这把锁在线程被 resume() 之前,是不会被释放的。

interrupt()

一旦调用某个线程的 interrupt() 之后,这个线程的中断标记位就会被设置成 true

每个线程都有这样的标记位,当线程执行时,应该定期检查这个标记位。

  • 如果标记位被设置成 true,就说明有程序想终止该线程。

while 循环体判断语句中,通过 Thread.currentThread().isInterrupt()判断线程是否被中断。

while (!Thread.currentThread().islnterrupted() && more work to do) {
    do more work
}
public class StopThread implements Runnable {
 
    @Override
    public void run() {
        int count = 0;
        while (!Thread.currentThread().isInterrupted() && count < 1000) {
            System.out.println("count = " + count++);
        }
    }
 
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new StopThread());
        thread.start();
        Thread.sleep(5);
        thread.interrupt();
    }
}

如果处于休眠(sleep、wait)中的线程被中断,那么线程是可以感受到中断信号的。

  • 并且会抛出一个 InterruptedException 异常。

  • 同时清除中断信号,将中断标记位设置成 false

这样一来就不用担心长时间休眠中线程感受不到中断了,因为即便线程还在休眠,仍然能够响应中断通知,并抛出异常。

如果负责编写的方法需要被别人调用,方法内调用了 sleep 或者 wait 等能响应中断的方法时,仅仅 catch 住异常是不够的。

方式一:方法签名抛异常,run() 强制 try/catch

将中断信号层层传递到顶层,最终让 run() 方法可以捕获到异常。

void subTask() throws InterruptedException {
    Thread.sleep(1000);
}

方式二:再次中断:

手动添加中断信号,中断信号依然可以被捕捉到。

这样后续执行的方法依然可以检测到这里发生过中断,可以做出相应的处理。

private void reInterrupt() {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        e.printStackTrace();
    }
}

生产者消费者模式

public class MyBlockingQueueForCondition {
 
   private Queue queue;
   private int max = 16;
   private ReentrantLock lock = new ReentrantLock();
   private Condition notEmpty = lock.newCondition();
   private Condition notFull = lock.newCondition();
 
 
   public MyBlockingQueueForCondition(int size) {
       this.max = size;
       queue = new LinkedList();
   }
 
   public void put(Object o) throws InterruptedException {
       lock.lock();
       try {
           while (queue.size() == max) {
               notFull.await();
           }
           queue.add(o);
           notEmpty.signalAll();
       } finally {
           lock.unlock();
       }
   }
 
   public Object take() throws InterruptedException {
       lock.lock();
       try {
           while (queue.size() == 0) {
               notEmpty.await();
           }
           Object item = queue.remove();
           notFull.signalAll();
           return item;
       } finally {
           lock.unlock();
       }
   }
}

Volatile

相比于 synchronized 关键字(重量级锁)对性能影响较大。

使用 volatile 不会引起上下文的切换和调度,所以 volatile 对性能的影响较小,开销较低。

volatile 可以保证其修饰的变量的可见性有序性,无法保证原子性(不能保证完全的原子性,只能保证单次读/写操作具有原子性,即无法保证复合操作的原子性)。

volatile 如何实现可见性?

volatile 修饰的共享变量 flag 被一个线程修改后,JMM(Java内存模型)会把该线程的CPU内存中的共享变量 flag 立即强制刷新回主存中,并且让其他线程的CPU内存中的共享变量 flag 缓存失效,这样当其他线程需要访问该共享变量 flag 时,就会从主存获取最新的数据。

image-20231017152846451

volatile 实现可见性的原理

Lock指令(汇编指令):

volatile 修饰的变量会多一个lock前缀的指令

会将处理器缓存的数据写回主存中,同时使其他线程的处理器缓存的数据失效,这样其他线程需要使用数据时,会从主存中读取最新的数据,从而实现可见性。

内存屏障(CPU指令):

为了性能优化,JMM 在不改变正确语义的前提下,会允许编译器和处理器对指令序列进行重排序。

JMM 提供了内存屏障阻止这种重排序。

Store屏障:

  • 当一个线程修改了volatile变量的值,它会在修改后插入一个写屏障,告诉处理器在写屏障之前将所有存储在缓存中的数据同步到主内存

Load屏障:

  • 当另一个线程读取volatile变量的值,它会在读取前插入一个读屏障,告诉处理器在读屏障之后的所有读操作都能获得内存屏障之前的所有写操作的最新结果

如果被 volatile 修饰时会多一个 ACC_VOLATILE,JVM把字节码生成机器码时会在相应位置插入内存屏障指令,因此可以通过读写屏障实现 volatile 修饰变量的可见性。

volatile 如何实现有序性?

volatile 保证变量有序性,禁止指令重排序。

volatile 实现有序性的原理

Java编译器会在生成指令时在适当位置插入内存屏障来禁止特定类型的处理器重排序。

内存屏障中禁止指令重排序的内存屏障的四种指令:

指令 说明
LoadLoad 屏障 保证在该屏障之后的读操作,不会被重排序到该屏障之前的读操作
StoreStore屏障 保证在该屏障之后的写操作,不会被重排序到该屏障之前的写操作,并且该屏障之前的写操作已被刷入主存
StoreLoad 屏障 保证在该屏障之后的读操作,能够看到该屏障之前的写操作对应变量的最新值
LoadStore 屏障 保证在该屏障之后的写操作,不会被重排序到该屏障之前的读操作

volatile的插入屏障策略

  • 在每个 volatile 操作的前面插入一个 StoreStore 屏障
  • 在每个 volatile 操作的后面插入一个 StoreLoad 屏障
  • 在每个 volatile 操作的后面插入一个 LoadLoad 屏障
  • 在每个 volatile 操作的后面插入一个 LoadStore 屏障

即在每个volatile写操作前后分别插入内存屏障,在每个volatile读操作后插入两个内存屏障。

image-20231017154024898

volatile 为什么不能保证原子性?

volatile 无法保证复合操作的原子性,但能保证单个操作的原子性。

volatile 常见的应用场景?

状态标志位:

使用 volatile 修饰一个变量通过赋值不同的常数或值来标识不同的状态

/**
 * 可以通过布尔值来控制线程的启动和停止
 */
public class MyThread extends Thread {
    
    // 状态标志变量
    private volatile boolean flag = true;
    
    // 根据状态标志位来执行
    public void run() {
        while (flag) {
            // do something
        }
    }
    // 根据状态标志位来停止
    public void stopThread() {
        flag = false; // 改变状态标志变量
    }
}

双重检查DLC:

单例模式的双重检查DLC可以通过 volatile 来修饰从存储单例模式对象的变量。

原子类

CAS

CAS(compareAndSwap)比较交换,是一种无锁原子算法,映射到操作系统就是一条cmpxchg硬件汇编指令(保证原子性)。

其作用是让CPU将内存值更新为新值,但是有个条件,内存值必须与期望值相同,并且CAS操作无需用户态与内核态切换,直接在用户态对内存进行读写操作(意味着不会阻塞/线程上下文切换)。

它包含3个参数CAS(V,E,N)V表示待更新的内存值,E表示预期值,N表示新值,当 V值等于E值时,才会将V值更新成N值,如果V值和E值不等,不做更新,这就是一次CAS的操作。

image-20231017135339775

CAS如何保证原子性

总线锁定:

CPU使用了总线锁。

总线锁就是使用CPU提供的LOCK#信号,当CPU在总线上输出LOCK#信号时,其他CPU的总线请求将被阻塞。

缓存锁定:

总线锁定方式在锁定期间,会导致大量阻塞,增加系统的性能开销。

所谓缓存锁定是指CPU缓存行进行锁定,当缓存行中的共享变量回写到内存时,其他CPU会通过总线嗅探机制感知该共享变量是否发生变化,如果发生变化,让自己对应的共享变量缓存行失效,重新从内存读取最新的数据,缓存锁定是基于缓存一致性机制来实现的,因为缓存一致性机制会阻止两个以上CPU同时修改同一个共享变量(现代CPU基本都支持和使用缓存锁定机制)。

  • 缓存行是CPU高速缓存存储的最小单位。

CAS的问题

只能保证一个共享变量原子操作:

CAS只能针对一个共享变量使用,如果多个共享变量就只能使用锁了,也可以把多个变量整成一个变量。

  • 也可以利用一个新的类,来整合一组共享变量,利用 AtomicReference 来把这个新对象整体进行 CAS 操作。

自旋时间太长:

当一个线程获取锁时失败,不进行阻塞挂起,而是间隔一段时间再次尝试获取,直到成功为止,这种循环获取的机制被称为自旋。

自旋锁好处:持有锁的线程在短时间内释放锁,那些等待竞争锁的线程就不需进入阻塞状态(无需线程上下文切换/无需用户态与内核态切换),它们只需要等一等(自旋),等到持有锁的线程释放锁之后即可获取,这样就避免了用户态和内核态的切换消耗。

自旋锁坏处:线程在长时间内持有锁,等待竞争锁的线程一直自旋,即CPU一直空转,资源浪费在毫无意义的地方。

ABA问题:

CAS需要检查待更新的内存值有没有被修改,如果没有则更新。

存在这样一种情况,如果一个值原来是A,变成了B,然后又变成了A,在CAS检查的时候会发现没有被修改。

  • 有两个线程,线程1读取到内存值A,线程1时间片用完,切换到线程2,线程2也读取到了内存值A,并把它修改为B值,然后再把B值还原到A值,修改次序是A->B->A,接着线程1恢复运行,它发现内存值还是A,然后执行CAS操作。

要解决ABA问题只要追加版本号即可,每次改变时加1,即A —> B —> A,变成1A —> 2B —> 3A,在Java中提供了AtomicStampedRdference可以实现这个方案。

阻塞队列

BlockingQueue 继承了 Queue 接口,是队列的一种。

BlockingQueue 是线程安全的:

  • 在很多场景下都可以利用线程安全的队列来优雅地解决业务自身的线程安全问题。

ArrayBlockingQueue

内部是用数组存储元素,利用 ReentrantLock 实现线程安全。

基本特性:

1、有界的阻塞数组,容量一旦创建,后续大小无法修改。

2、元素是有顺序的,按照先入先出进行排序,从队尾插入数据数据,从队头拿数据。

3、队列满时,往队列中 put 数据会被阻塞,队列空时,往队列中拿数据也会被阻塞。

新增数据:

// 新增,如果队列满,无限阻塞
public void put(E e) throws InterruptedException {
    // 元素不能为空
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 队列如果是满的,就无限等待
        // 一直等待队列中有数据被拿走时,自己被唤醒
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
 
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1; 同一时刻只能一个线程进行操作此方法
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    // putIndex 为本次插入的位置
    items[putIndex] = x;
    // ++ putIndex 计算下次插入的位置
    // 如果下次插入的位置,正好等于队尾,下次插入就从 0 开始
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // 唤醒因为队列空导致的等待线程
    notEmpty.signal();
}

查询数据:

每次拿数据的位置就是 takeIndex 的位置,在找到本次该拿的数据之后:

  • 会把 takeIndex 加 1,计算下次拿数据时的索引位置。

有个特殊情况是:

  • 如果本次拿数据的位置已经是队尾了,那么下次拿数据的位置就要从头开始,就是从 0 开始了。
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 如果队列为空,无限等待
        // 直到队列中有数据被 put 后,自己被唤醒
        while (count == 0)
            notEmpty.await();
        // 从队列中拿数据
        return dequeue();
    } finally {
        lock.unlock();
    }
}
 
private E dequeue() {
    final Object[] items = this.items;
    // takeIndex 代表本次拿数据的位置,是上一次拿数据时计算好的
    E x = (E) items[takeIndex];
    // 帮助 gc
    items[takeIndex] = null;
    // ++ takeIndex 计算下次拿数据的位置
    // 如果正好等于队尾的话,下次就从 0 开始拿数据
    if (++takeIndex == items.length)
        takeIndex = 0;
    // 队列实际大小减 1
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    // 唤醒被队列满所阻塞的线程
    notFull.signal();
    return x;
}

LinkedBlockingQueue

内部用链表实现的BlockingQueue。

基本特性:

1、它容量默认就为整型的最大值:Integer.MAX_VALUE

2、基于链表的阻塞队列,其底层的数据结构是链表。

3、链表维护先入先出队列,新元素被放在队尾,获取元素从队头部拿。

LinkedBlockingQueue 内部构成,分成三个部分:

  • 链表存储 + 锁 + 迭代器。
// 链表结构 begin
//链表的元素
static class Node<E> {
    E item;
 
    //当前元素的下一个,为空表示当前节点是最后一个
    Node<E> next;
 
    Node(E x) { item = x; }
}
 
//链表的容量,默认 Integer.MAX_VALUE
private final int capacity;
 
//链表已有元素大小,使用 AtomicInteger,所以是线程安全的
private final AtomicInteger count = new AtomicInteger();
 
//链表头
transient Node<E> head;
 
//链表尾
private transient Node<E> last;
// 链表结构 end
 
// 锁 begin
//take 时的锁
private final ReentrantLock takeLock = new ReentrantLock();
 
// take 的条件队列,condition 可以简单理解为基于 ASQ 同步机制建立的条件队列
private final Condition notEmpty = takeLock.newCondition();
 
// put 时的锁,设计两把锁的目的,主要为了 take 和 put 可以同时进行
private final ReentrantLock putLock = new ReentrantLock();
 
// put 的条件队列
private final Condition notFull = putLock.newCondition();
// 锁 end
 
// 迭代器 
// 实现了自己的迭代器
private class Itr implements Iterator<E> {
………………
}

1、链表的作用是为了保存当前节点,节点中的数据可以是任意东西,是一个泛型。

  • 比如说队列被应用到线程池时,节点就是线程,比如队列被应用到消息队列中,节点就是消息。

2、锁有 take 锁和 put 锁,是为了保证队列操作时的线程安全。

  • 设计两种锁,是为了 takeput 两种操作可以同时进行,互不影响。

阻塞新增:

// 把e新增到队列的尾部。
// 如果有可以新增的空间的话,直接新增成功,否则当前线程陷入等待
public void put(E e) throws InterruptedException {
    // e 为空,抛出异常
    if (e == null) throw new NullPointerException();
    // 预先设置 c 为 -1,约定负数为新增失败
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 设置可中断锁
    putLock.lockInterruptibly();
    try {
        // 队列满了
        // 当前线程阻塞,等待其他线程的唤醒(其他线程 take 成功后就会唤醒此处被阻塞的线程)
        while (count.get() == capacity) {
            // await 无限等待
            notFull.await();
        }
 
        // 队列没有满,直接新增到队列的尾部
        enqueue(node);
 
        // 新增计数赋值,注意这里 getAndIncrement 返回的是旧值
        // 这里的 c 是比真实的 count 小 1 的
        c = count.getAndIncrement();
 
        // 如果链表现在的大小 小于链表的容量,说明队列未满
        // 可以尝试唤醒一个 put 的等待线程
        if (c + 1 < capacity)
            notFull.signal();
 
    } finally {
        // 释放锁
        putLock.unlock();
    }
    // c==0,代表队列里面有一个元素
    // 会尝试唤醒一个take的等待线程
    if (c == 0)
        signalNotEmpty();
}
// 入队,把新元素放到队尾
private void enqueue(Node<E> node) {
    last = last.next = node;
}

SynchronousQueue

它的容量为 0,没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入。

  • 每次放数据的时候也会阻塞,直到有消费者来取。

基本特性:

在内部抽象出了两种算法实现,一种是先入先出的队列,一种是后入先出的堆栈。

两种算法被两个内部类实现:

  • 队列不存储数据,所以没有大小,也无法迭代。
  • 插入操作的返回必须等待另一个线程完成对应数据的删除操作,反之亦然。
  • 队列由两种数据结构组成,分别是后入先出的堆栈和先入先出的队列,堆栈是非公平的,队列是公平的。

PriorityBlockingQueue

支持优先级的无界阻塞队列,可以通过自定义类实现 compareTo() 方法来指定元素排序规则。

  • 或者初始化时通过构造器参数 Comparator 来指定排序规则。

DelayQueue

底层使用了排序和超时阻塞实现了延迟队列:

  • 排序使用的是 PriorityQueue 排序能力,超时阻塞使用的是锁的等待能力。

DelayQueue 其实就是为了满足延迟执行的场景,在已有 API 的基础上进行了封装。

内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序。

  • 内部采用的是 的数据结构。

基本特性:

1、队列中元素将在过期时被执行,越靠近队头,越早过期。

2、未过期的元素不能够被 take。

3、不允许空元素。

内存模型

JMM抽象结构划分为线程本地缓存与主存,每个线程均有自己的本地缓存,本地缓存是线程私有的,主存则是计算机内存,它是共享的。

为什么需要 JMM(Java Memory Model,Java 内存模型)

程序最终执行的效果会依赖于具体的处理器,而不同的处理器的规则又不一样,不同的处理器之间可能差异很大,因此同样的一段代码,可能在处理器 A 上运行正常,而在处理器 B 上运行的结果却不一致。

  • 在没有 JMM 之前,不同的 JVM 的实现,也会带来不一样的 翻译 结果。

如果达成一致后,就可以很清楚的知道什么样的代码最终可以达到什么样的运行效果,让多线程运行结果可以预期。

主内存和工作内存的关系

image-20231017161900549

JMM 有以下规定:

所有的变量都存储在主内存中,同时每个线程拥有自己独立的工作内存,而工作内存中的变量的内容是主内存中该变量的拷贝。

线程不能直接读/写主内存中的变量,但可以操作自己工作内存中的变量,然后再同步到主内存中,这样,其他线程就可以看到本次修改。

主内存是由多个线程所共享的,但线程间不共享各自的工作内存,如果线程间需要通信,则必须借助主内存中转来完成。

可见性

当一个线程修改了共享变量的值,其他线程能够立即得知这个修改。

能够保证可见性的措施:

volatile 关键字。

synchronized、Lock、并发集合等一系列工具。

原子性

原子性是指一个或者多个操作在CPU执行的过程中不被中断的特性,要么执行,要不执行,不能执行到一半。

比如 i++ 这一行代码在 CPU 中执行时,可能会从一行代码变为以下的 3 个指令:

第一个步骤是读取。

第二个步骤是增加。

第三个步骤是保存。

所以 i++ 是不具备原子性的,也不是线程安全的。

有序性

有序性指禁止指令重排序,即保证程序执行代码的顺序与编写程序的顺序一致(程序执行顺序按照代码的先后顺序执行)。

重排序的3种情况:

编译器优化:

  • 编译器(包括 JVM、JIT 编译器等)出于优化的目的,例如当前有了数据 a,把对 a 的操作放到一起效率会更高,避免读取 b 后又返回来重新读取 a 的时间开销,此时在编译的过程中会进行一定程度的重排。

CPU重排序:

  • CPU 同样会有优化行为,这里的优化和编译器优化类似,都是通过乱序执行的技术来提高整体的执行效率。

内存的重排序:

  • 由于内存有缓存的存在,在 JMM 里表现为主存和本地内存,而主存和本地内存的内容可能不一致,所以这也会导致程序表现出乱序的行为。

重排需要遵循as-if-serial原则,编译器和处理器不会对存在数据依赖关系的操作做重排序,因为这种重排序会改变执行结果。

重排序的好处:提高处理速度。

int i = 10
int j = 10
//这就是数据依赖,int i 与 int j 不能排到 int c下面去
int c = i + j

Happens Before规则

Happens Before关系是用来描述和可见性相关问题的:

如果第一个操作 happens before 第二个操作,那么我们就说第一个操作对于第二个操作一定是可见的,也就是第二个操作在执行时就一定能保证看见第一个操作执行的结果。

不具备 happens before 关系的例子:

如果有两个线程,分别执行 write 和 read 方法,那么由于这两个线程之间没有相互配合的机制,所以 write 和 read 方法内的代码不具备 happens-before 关系,其中的变量的可见性无法保证。

public class Visibility {
    int x = 0;
    public void write() {
        x = 1;
    }
    public void read() {
        int y = x;
    }
}

单线程规则:

  • 在一个单独的线程中,按照程序代码的顺序,先执行的操作 happen before 后执行的操作。

volatile 变量规则:

  • 对一个 volatile 变量的写操作 happen before 后面对该变量的读操作。

线程启动规则:

  • Thread 对象的 start 方法 happen before 此线程 run 方法中的每一个操作。

线程池

为什么要使用线程池

线程池可以解决线程生命周期的系统开销问题,同时还可以加快响应速度:

  • 因为线程池中的线程是可以复用的,我们只用少量的线程去执行大量的任务,不用频繁创建线程。

线程池可以管理内存和 CPU 的使用,避免资源使用不当:

  • 线程池会根据配置和任务数量灵活地控制线程数量。

线程池可以统一管理资源:

  • 比如线程池可以统一管理任务队列和线程。

线程池参数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

corePoolSize:

  • 线程池的核心线程数量,即线程池中可以保持活动状态的最小线程数。
  • 当提交的任务数超过核心线程数量时,线程池可以创建更多的线程来执行任务。

maximumPoolSize:

  • 线程池中允许同时存在的最大线程数量,即当任务数超过核心线程数量时,线程池可以创建的最大线程数量。
  • 当线程池中的线程数量已经达到最大值且任务队列已满时,可以采取拒绝策略处理过多的任务。
  • 可以在创建线程池时指定最大线程数,也可以在运行时动态地修改。

keepAliveTime:

  • 线程空闲后的存活时间。
  • 当线程池中的线程数量超过核心线程数时,空闲的线程会在等待新任务到来的过程中等待一段时间。
  • 如果等待时间超过了 keepAliveTime,则该线程会被销毁,以释放资源。

unit:

  • keepAliveTime 的单位,可以是毫秒、微秒、纳秒等等。

workQueue:

  • 线程池中用于保存等待中任务的阻塞队列。

threadFactory:

  • 线程工厂,用于创建线程的工厂,可以自定义实现。
  • 可以选择使用默认的线程工厂,创建的线程都会在同一个线程组,并拥有一样的优先级,且都不是守护线程。
  • 也可以选择自己定制线程工厂,以方便给线程自定义命名,不同的线程池内的线程通常会根据具体业务来定制不同的线程名。

handler:

  • 当线程池已经关闭或已满时,新任务的处理策略。
image-20231220132715694

在创建了线程池后,线程池中的线程数为零。

当调用 execute() 方法添加一个请求任务时,线程池会做出如下判断:

  • 如果正在运行的线程数量 小于 corePoolSize,那么马上创建线程运行这个任务。

  • 如果正在运行的线程数量 大于或等于 corePoolSize,那么将这个任务放入队列。

  • 如果这个时候队列满了且正在运行的线程数量还 小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务。

  • 如果队列满了且正在运行的线程数量 大于或等于 maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。

  • 当一个线程完成任务时,它会从队列中取下一个任务来执行。

当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:

  • 如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。
  • 所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。

拒绝策略

线程池会在以下两种情况下会拒绝新提交的任务:

当我们调用 shutdown 等方法关闭线程池后:

  • 即便此时可能线程池内部依然有没执行完的任务正在执行,但由于线程池已经关闭,此时如果再向线程池内提交任务,就会遭到拒绝。

线程池没有能力继续处理新提交的任务,也就是工作已经非常饱和的时候。

4种默认的拒绝策略:

第一种拒绝策略是 AbortPolicy

  • 在拒绝任务时,会直接抛出一个类型为 RejectedExecutionException 的 RuntimeException,让你感知到任务被拒绝了,可以根据业务逻辑选择重试或者放弃提交等策略。

第二种拒绝策略是 DiscardPolicy

  • 当新任务被提交后直接被丢弃掉,也不会给你任何的通知,存在一定的风险,因为我们提交的时候根本不知道这个任务会被丢弃,可能造成数据丢失。

第三种拒绝策略是 DiscardOldestPolicy

  • 如果线程池没被关闭且没有能力执行,会丢弃任务队列中的头结点,通常是存活时间最长的任务,这样可以腾出空间给新提交的任务。

第四种拒绝策略是 CallerRunsPolicy

  • 当有新任务提交后,如果线程池没被关闭且没有能力执行,则把这个任务交于提交任务的线程执行,也就是谁提交任务,谁就负责执行任务,这样做主要有两点好处:
  • 第一点:新提交的任务不会被丢弃,这样也就不会造成业务损失。
  • 第二点:由于谁提交任务谁就要负责执行任务,这样提交任务的线程就得负责执行任务,而执行任务又是比较耗时的,在这段期间,提交任务的线程被占用,也就不会再提交新的任务,减缓了任务提交的速度。

以上内置的策略均实现了 RejectedExecutionHandler 接口

可以自己扩展 RejectedExecutionHandler 接口,定义自己的拒绝策略。

使用CallerRunsPolicy顺序问题:

线程数线程池的最大线程数并且阻塞队列已满的情况下,后到的数据会执行拒绝策略,让调用线程(提交任务的线程)直接执行此任务,导致数据处理顺序不一致。

当在多线程中数据处理时需要强关联数据时间顺序时,最好考虑一下其他的处理方式,避免踩坑。

设置线程数

公式:

最佳线程数目 = ((线程等待时间+线程CPU时间)/ 线程CPU时间 )* CPU数目

举例:

服务器CPU核数为4核,一个任务线程CPU耗时为20ms,线程等待(网络IO、磁盘IO)耗时80ms。

最佳线程数目:( 80 + 20 )/20 * 4 = 20

  • 线程的等待时间越大,线程数就要设置越大。

线程数设置多大,是根据我们自身的业务的,需要自己去压力测试,设置一个合理的数值。

CPU密集型:

  • 操作内存处理的业务,一般线程数设置为:CPU核数 + 1 或者 CPU核数*2,核数为4的话,一般设置 5 或 8
  • CPU密集任务只有在真正的多核CPU上才可能得到加速(通过多线程),而在单核CPU上。
  • 无论你开几个模拟的多线程该任务都不可能得到加速,因为CPU总的运算能力就那些。

IO密集型:

  • 会导致浪费大量的CPU运算能力浪费在等待。

  • 文件操作,网络操作,数据库操作,一般线程设置为:CPU核数 / (1-0.9),核数为4的话,一般设置 40

常见线程池

FixedThreadPool:

它的核心线程数和最大线程数是一样的,线程池中的线程数除了初始阶段需要从 0 开始增加外,之后的线程数量就是固定的,就算任务数超过线程数,线程池也不会再创建更多的线程来处理任务,而是会把超出线程处理能力的任务放到任务队列中进行等待。

而且就算任务队列满了,也无法再增加新的线程了。

BlockingQueue选取的是LinkedBlockingQueue。

CachedThreadPool:

它的特点在于线程数是几乎可以无限增加的(实际最大可以达到 Integer.MAX_VALUE,为 2^31-1)。

  • 当线程闲置时还可以对线程进行回收。

它有一个用于存储提交任务的队列,但这个队列是 SynchronousQueue,队列的容量为0,实际不存储任何任务,它只负责对任务进行中转和传递,所以效率比较高。

ScheduledThreadPool:

ScheduledThreadPool,它支持定时或周期性执行任务。

BlockingQueue选取的是延迟队列DelayedWorkQueue。

SingleThreadExecutor:

它会使用唯一的一个线程去执行任务,如果线程在执行任务的过程中发生异常,线程池也会重新创建一个线程来执行后续的任务。

这种线程池由于只有一个线程,所以非常适合用于所有任务都需要按被提交的顺序依次执行的场景。

BlockingQueue选取的是LinkedBlockingQueue。

ForkJoinPool

它非常适合执行可以产生子任务的任务。

适合用于递归的场景,例如树的遍历、最优路径搜索等场景

关闭线程池

shutdown():

调用 shutdown() 方法之后线程池并不是立刻就被关闭,线程池会在执行完正在执行的任务和队列中等待的任务后才彻底关闭。

调用 shutdown() 方法后如果还有新的任务被提交,线程池则会根据拒绝策略直接拒绝后续新提交的任务。

shutdownNow():

首先会给所有线程池中的线程发送 interrupt 中断信号,尝试中断这些任务的执行。

队列中正在等待的所有任务转移到一个 List 中并返回,可根据返回的任务 List 来进行一些补救的操作,例如记录在案并在后期重试。

线程异常

当一个线程池里面的线程异常后:

  • 当执行方式是execute时:
    • 会抛出异常,线程池会把这个线程移除掉,并创建一个新的线程放到线程池中。
  • 当执行方式是submit时:
    • 会抛出异常,可以捕获到异常,不会把这个线程移除掉,也不会创建新的线程放入到线程池中。

以上俩种执行方式,都不会影响线程池里面其他线程的正常执行。

线程复用原理

在线程池中,同一个线程可以从 BlockingQueue 中不断提取新任务来执行。

线程池对 Thread 进行了封装,并不是每次执行任务都会调用 Thread.start() 来创建新线程

  • 而是让每个线程去执行一个 循环任务

    • 在这个 循环任务 中,不停地检查是否还有任务等待被执行
  • 如果有则直接去执行这个任务,也就是调用任务的 run 方法

    • run 方法当作和普通方法一样的地位去调用
    • 相当于把每个任务的 run() 方法串联了起来
  • 所以线程数量并不增加。

实现线程复用的逻辑主要在一个不停循环的 while 循环体中

  • 通过取 Worker 的 firstTask 或者通过 getTask 方法从 workQueue 中获取待执行的任务。

  • 直接调用 task 的 run 方法来执行具体的任务(而不是新建线程)。

ThreadPoolExecutor重要属性

//ctl 线程池状态控制字段,由两部分组成:
//1:workerCount  wc 工作线程数,我们限制 workerCount 最大到(2^29)-1,大概 5 亿个线程
//2:runState rs 线程池的状态,提供了生命周期的控制,源码中有很多关于状态的校验,状态枚举如下:
//RUNNING(-536870912):接受新任务或者处理队列里的任务。
//SHUTDOWN(0):不接受新任务,但仍在处理已经在队列里面的任务。
//STOP(-536870912):不接受新任务,也不处理队列中的任务,对正在执行的任务进行中断。
//TIDYING(1073741824): 所以任务都被中断,workerCount 是 0,整理状态
//TERMINATED(1610612736): terminated() 已经完成的时候
 
//runState 之间的转变过程:
//RUNNING -> SHUTDOWN:调用 shudown(),finalize()
//(RUNNING or SHUTDOWN) -> STOP:调用shutdownNow()
//SHUTDOWN -> TIDYING -> workerCount ==0
//STOP -> TIDYING -> workerCount ==0
//TIDYING -> TERMINATED -> terminated() 执行完成之后
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;// 29
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// =(2^29)-1=536870911
 
// Packing and unpacking ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int runStateOf(int c)     { return c & ~CAPACITY; }
 
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;//-536870912
private static final int SHUTDOWN   =  0 << COUNT_BITS;//0
private static final int STOP       =  1 << COUNT_BITS;//-536870912
private static final int TIDYING    =  2 << COUNT_BITS;//1073741824
private static final int TERMINATED =  3 << COUNT_BITS;//1610612736
 
// 已完成任务的计数
volatile long completedTasks;
// 线程池最大容量
private int largestPoolSize;
// 已经完成的任务数
private long completedTaskCount;
// 用户可控制的参数都是 volatile 修饰的
// 可以使用 threadFactory 创建 thread
// 创建失败一般不抛出异常,只有在 OutOfMemoryError 时候才会
private volatile ThreadFactory threadFactory;
// 饱和或者运行中拒绝任务的 handler 处理类
private volatile RejectedExecutionHandler handler;
// 线程存活时间设置
private volatile long keepAliveTime;
// 设置 true 的话,核心线程空闲 keepAliveTime 时间后,也会被回收
private volatile boolean allowCoreThreadTimeOut;
// coreSize
private volatile int corePoolSize;
// maxSize 最大限制 (2^29)-1
private volatile int maximumPoolSize;
// 默认的拒绝策略
private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();
 
// 队列会 hold 住任务,并且利用队列的阻塞的特性,来保持线程的存活周期
private final BlockingQueue<Runnable> workQueue;
 
// 大多数情况下是控制对 workers 的访问权限
private final ReentrantLock mainLock = new ReentrantLock();
private final Condition termination = mainLock.newCondition();
 
// 包含线程池中所有的工作线程
private final HashSet<Worker> workers = new HashSet<Worker>();

Worker可以理解成线程池中任务运行的最小单元:

// 线程池中任务执行的最小单元
// Worker 继承 AQS,具有锁功能
// Worker 实现 Runnable,本身是一个可执行的任务
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    // 任务运行的线程
    final Thread thread;
 
    // 需要执行的任务
    Runnable firstTask;
 
    // 非常巧妙的设计,Worker本身是个 Runnable,把自己作为任务传递给 thread
    // 内部有个属性又设置了 Runnable
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 把 Worker 自己作为 thread 运行的任务
        this.thread = getThreadFactory().newThread(this);
    }
 
   /** Worker 本身是 Runnable,run 方法是 Worker 执行的入口, runWorker 是外部的方法 */
    public void run() {
        runWorker(this);
    }
 
    private static final long serialVersionUID = 6138294804551838833L;
 
    // Lock methods
    // 0 代表没有锁住,1 代表锁住
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    // 尝试加锁,CAS 赋值为 1,表示锁住
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    // 尝试释放锁,释放锁没有 CAS 校验,可以任意的释放锁
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
 
    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
 
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

在线程池中,最小的执行单位就是 Worker,所以 Worker 实现了 Runnable 接口,实现了 run 方法。

Worker 本身也实现了 AQS,所以其本身也是一个锁,其在执行任务的时候,会锁住自己,任务执行完成之后,会释放自己。

线程池的任务提交

线程池的任务提交从 submit 方法 开始,主要做了两件事情:

把 Runnable 和 Callable 都转化成 FutureTask。

使用 execute 方法执行 FutureTask。

在 execute 方法中,多次调用 addWorker 方法把任务传入,Worker 内部有一个 Thread 对象,它正是最终真正执行任务的线程,所以一个 Worker 就对应线程池中的一个线程,addWorker 就代表增加线程。

线程复用的逻辑实现主要在 Worker 类中的 run 方法里执行的 runWorker 方法中。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 工作的线程小于核心线程数,创建新的线程,成功返回,失败不抛异常
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        // 线程池状态可能发生变化
        c = ctl.get();
    }
    // 工作的线程大于等于核心线程数,或者新建线程失败
    // 线程池状态正常,并且可以入队的话,尝试入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 如果线程池状态异常 尝试从队列中移除任务,可以移除的话就拒绝掉任务
        if (!isRunning(recheck) && remove(command))
            reject(command);
        // 发现可运行的线程数是 0,就初始化一个线程,这里是个极限情况,入队的时候,突然发现
        // 可用线程都被回收了
        else if (workerCountOf(recheck) == 0)
            // Runnable是空的,不会影响新增线程,但是线程在 start 的时候不会运行
            // Thread.run() 里面有判断
            addWorker(null, false);
    }
    // 队列满了,开启线程到 maxSize,如果失败直接拒绝,
    else if (!addWorker(command, false))
        reject(command);
}
// 结合线程池的情况看是否可以添加新的 worker
// firstTask 不为空可以直接执行,为空执行不了,Thread.run()方法有判断,Runnable为空不执行
// core 为 true 表示线程最大新增个数是 coresize,false 表示最大新增个数是 maxsize
// 返回 true 代表成功,false 失败
// break retry 跳到retry处,且不再进入循环
// continue retry 跳到retry处,且再次进入循环
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 先是各种状态的校验
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        // rs >= SHUTDOWN 说明线程池状态不正常
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
 
        for (;;) {
            int wc = workerCountOf(c);
            // 工作中的线程数大于等于容量,或者大于等于 coreSize or maxSize
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                // break 结束 retry 的 for 循环
                break retry;
            c = ctl.get();  // Re-read ctl
            // 线程池状态被更改
            if (runStateOf(c) != rs)
                // 跳转到retry位置
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
 
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 巧妙的设计,Worker 本身是个 Runnable.
        // 在初始化的过程中,会把 worker 丢给 thread 去初始化
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 启动线程,实际上去执行 Worker.run 方法
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
public void run() {
    runWorker(this);
}
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    //帮助gc回收
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // task 为空的情况:
        // 1:任务入队列了,极限情况下,发现没有运行的线程,于是新增一个线程;
        // 2:线程执行完任务执行,再次回到 while 循环。
        // 如果 task 为空,会使用 getTask 方法阻塞从队列中拿数据,如果拿不到数据,会阻塞住
        while (task != null || (task = getTask()) != null) {
            //锁住 worker
            w.lock();
            // 线程池 stop 中,但是线程没有到达中断状态,帮助线程中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //执行 before 钩子函数
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //同步执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //执行 after 钩子函数,如果这里抛出异常,会覆盖 catch 的异常
                    //所以这里异常最好不要抛出来
                    afterExecute(task, thrown);
                }
            } finally {
                //任务执行完成,计算解锁
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //做一些抛出异常的善后工作
        processWorkerExit(w, completedAbruptly);
    }
}
/**
 * run 方法可以直接被调用
 * 也可以由线程池进行调用
 */
public void run() {
    // 状态不是任务创建,或者当前任务已经有线程在执行了
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        // Callable 不为空,并且已经初始化完成
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 调用执行
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            // 给 outcome 赋值
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

线程执行完任务之后:

// 从阻塞队列中拿任务
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
 
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
 
        //线程池关闭 && 队列为空,不需要在运行了,直接放回
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
 
        int wc = workerCountOf(c);
 
        // Are workers subject to culling?
        // true  运行的线程数大于 coreSize || 核心线程也可以被灭亡
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
 
        // 队列以 LinkedBlockingQueue 为例,timedOut 为 true 的话说明下面 poll 方法执行返回的是 null
        // 说明在等待 keepAliveTime 时间后,队列中仍然没有数据
        // 说明此线程已经空闲了 keepAliveTime 了
        // 再加上 wc > 1 || workQueue.isEmpty() 的判断
        // 所以使用 compareAndDecrementWorkerCount 方法使线程池数量减少 1
        // 并且直接 return,return 之后,此空闲的线程会自动被回收
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
 
        try {
            // 从队列中阻塞拿 worker
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            // 设置已超时,说明此时队列没有数据
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

常见问题

父子任务使用一个线程池:

向线程池中提交了一个任务,然后在这个任务的内部实现中又往同一个线程池中再次提交了一个任务,相当于父子任务在同一个线程池中执行,出现线程死锁也就是循环等待的情况。

父任务全部处于执行状态,这时候子任务想要执行需要等父任务执行完成,但是父任务都执行不完,因为还有个子任务没完成,即父任务等待子任务执行完成,而子任务等待父任务释放线程池资源,这也就造成了死锁。

并发工具类

Semaphore信号量

信号量来控制那些需要限制并发访问量的资源。

信号量会维护 许可证 的计数,而线程去访问共享资源前,必须先拿到许可证。

线程可以从信号量中去获取一个许可证,一旦线程获取之后,信号量持有的许可证就转移过去了,所以信号量手中剩余的许可证要减一。

public class SemaphoreDemo {

    static Semaphore semaphore = new Semaphore(3);

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(50);
        for (int i = 0; i < 1000; i++) {
            service.submit(new Task());
        }
        service.shutdown();
    }

    static class Task implements Runnable {

        @Override
        public void run() {
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "拿到了许可证,花费2秒执行慢服务");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("慢服务执行完毕," + Thread.currentThread().getName() + "释放了许可证");
            semaphore.release();
        }
    }
}

信号量能被 FixedThreadPool 替代吗?

如果在应用程序中会有不同类型的任务,它们也是通过不同的线程池来调用慢服务的。

因为信号量具有跨线程、跨线程池的特性,所以即便请求来自于不同的线程池,我们也可以限制它们的访问。

如果用 FixedThreadPool 去限制,那就做不到跨线程池限制了。

CountDownLatch

在创建实例的时候,在构造函数中传入倒数次数,然后由需要等待的线程去调用 await 方法开始等待,而每一次其他线程调用了 countDown 方法之后,计数便会减 1,直到减为 0 时,之前等待的线程便会继续运行。

场景1:一个线程等待其他多个线程都执行完毕,再继续自己的工作:

比如在比赛跑步时有 5 个运动员参赛,终点有一个裁判员,什么时候比赛结束呢?

  • 当所有人都跑到终点之后,这相当于裁判员等待 5 个运动员都跑到终点,宣布比赛结束。
public class RunDemo {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(5);
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i + 1;
            Runnable runnable = new Runnable() {

                @Override
                public void run() {
                    try {
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println(no + "号运动员完成了比赛。");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        latch.countDown();
                    }
                }
            };
            service.submit(runnable);
        }
        System.out.println("等待5个运动员都跑完.....");
        latch.await();
        System.out.println("所有人都跑完了,比赛结束。");
    }
}

场景2:多个线程等待某一个线程的信号,同时开始执行。

比如在运动会上,在运动员起跑之前都会等待裁判员发号施令,一声令下运动员统一起跑。

public class RunDemo {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("运动员有5秒的准备时间");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i + 1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    System.out.println(no + "号运动员准备完毕,等待裁判员的发令枪");
                    try {
                        countDownLatch.await();
                        System.out.println(no + "号运动员开始跑步了");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            service.submit(runnable);
        }
        Thread.sleep(5000);
        System.out.println("5秒准备时间已过,发令枪响,比赛开始!");
        countDownLatch.countDown();
    }
}

CyclicBarrier

CyclicBarrier 构造出一个集结点,当某一个线程执行 await() 的时候,它就会到这个集结点开始等待,等待这个栅栏被撤销。

直到预定数量的线程都到了这个集结点之后,这个栅栏就会被撤销,之前等待的线程就在此刻统一出发,继续去执行剩下的任务。

假设班级春游去公园里玩,并且会租借三人自行车,每个人都可以骑,但由于这辆自行车是三人的,所以要凑齐三个人才能骑一辆。

public class CyclicBarrierDemo {

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        for (int i = 0; i < 6; i++) {
            new Thread(new Task(i + 1, cyclicBarrier)).start();
        }
    }

    static class Task implements Runnable {

        private int id;
        private CyclicBarrier cyclicBarrier;

        public Task(int id, CyclicBarrier cyclicBarrier) {
            this.id = id;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println("同学" + id + "现在从大门出发,前往自行车驿站");
            try {
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println("同学" + id + "到了自行车驿站,开始等待其他人到达");
                cyclicBarrier.await();
                System.out.println("同学" + id + "开始骑车");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

LockSupport

AQS内部控制线程阻塞和唤醒是通过LockSupport来实现的。

@IntrinsicCandidate
public native void park(boolean isAbsolute, long time);

@IntrinsicCandidate
public native void unpark(Object thread);

CopyOnWriteArrayList

具有以下特征:

线程安全的,多线程环境下可以直接使用,无需加锁。

通过锁 + 数组拷贝 + volatile 关键字保证了线程安全。

  • volatile:值被修改后,其它线程能够立马感知最新值。

每次数组操作,都会把数组拷贝一份出来,在新数组上进行操作,操作成功之后再赋值回去。

CopyOnWriteArrayList数据结构和ArrayList是一致的,底层是个数组,只不过CopyOnWriteArrayList在对数组进行操作的时候。

基本会分四步走:

1、加锁。

2、从原数组中拷贝出新数组。

3、在新数组上进行操作,并把新数组复制给数组容器。

4、解锁。

新增:

// 添加元素到数组尾部
public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lock();
    try {
        // 得到所有的原数组
        Object[] elements = getArray();
        int len = elements.length;
        // 拷贝到新数组里面,新数组的长度是 + 1 的,因为新增会多一个元素
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        // 在新数组中进行赋值,新元素直接放在数组的尾部
        newElements[len] = e;
        // 替换掉原来的数组
        setArray(newElements);
        return true;
    // finally 里面释放锁,保证即使 try 发生了异常,仍然能够释放锁   
    } finally {
        lock.unlock();
    }
}

适用场景:

读多写少的场景很适合使用 CopyOnWrite 集合。

缺点:

内存占用问题

  • 因为 CopyOnWrite 的写时复制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,这一点会占用额外的内存空间。

在元素较多或者复杂的情况下,复制的开销很大

  • 复制过程不仅会占用双倍内存,还要消耗 CPU 等资源,会降低整体性能。

数据一致性问题

  • CopyOnWrite 容器的修改是先修改副本,所以这次修改对于其他线程来说,并不是实时能看到的,只有在修改完后才能体现出来。

AQS

AQS定义了一套多线程访问共享资源的同步模板。

  • 解决了实现同步器时涉及的大量细节问题,能够极大地减少实现工作。

三部分组成:

  • state同步状态、Node组成的CLH队列、ConditionObject条件变量。
    • 包含Node组成的条件单向队列。

image-20231018135610476

整体流程

image-20231018192938937

使用 AQS 写一个线程协作工具类

1、新建一个线程协作工具类,在内部写一个 Sync 类。

  • Sync 类继承 AbstractQueuedSynchronizer,即 AQS。

2、在 Sync 类里,根据是否是独占,来重写对应的方法。

  • 如果是独占,则重写 tryAcquiretryRelease 等方法。
  • 如果是非独占,则重写 tryAcquireSharedtryReleaseShared 等方法。

3、在自己的线程协作工具类中,实现获取/释放的相关方法,并在里面调用 AQS 对应的方法。

  • 如果是独占则调用 acquirerelease 等方法。
  • 非独占则调用 acquireSharedreleaseSharedacquireSharedInterruptibly 等方法。

同步状态

AQS中维护了一个同步状态变量state,获取、释放资源是否成功都是由state决定的。

  • 比如state>0代表可获取资源,否则无法获取。

  • state的具体语义由实现者去定义。

ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch定义的state语义都不一样:

  • ReentrantLockstate用来表示是否有锁资源。

  • ReentrantReadWriteLockstate16位代表读锁状态,低16位代表写锁状态。

  • Semaphorestate用来表示可用信号的个数。

  • CountDownLatchstate用来表示计数器的值。

CLH队列

CLHAQS内部维护的FIFO(先进先出)双端双向队列(方便尾部节点插入)。

  • 基于链表数据结构。

当一个线程竞争资源失败,就会将等待资源的线程封装成一个Node节点。

通过CAS原子操作插入队列尾部,最终不同的Node节点连接组成了一个CLH队列:

  • 先进先出保证了公平性。
  • 非阻塞的队列,通过自旋锁和CAS保证节点插入和移除的原子性,实现无锁快速插入。
  • 采用了自旋锁思想,所以CLH也是一种基于链表的可扩展、高性能、公平的自旋锁。

Node内部类

NodeAQS的内部类:

  • 每个等待资源的线程都会封装成Node节点组成CLH队列、等待队列。

线程获取资源失败,封装成Node节点从CLH队列尾部入队并阻塞线程。

  • 某线程释放资源时会把CLH队列首部Node节点关联的线程唤醒,再次获取资源。

image-20231018142402280

AQS中提供addWaiter函数完成Node节点的创建与入队。

private Node addWaiter(Node mode) {
        //根据当前线程创建节点,等待状态为0
        Node node = new Node(Thread.currentThread(), mode);
        // 获取尾节点
        Node pred = tail;
        if (pred != null) {
            //如果尾节点不等于null,把当前节点的前驱节点指向尾节点
            node.prev = pred;
            //通过cas把尾节点指向当前节点
            if (compareAndSetTail(pred, node)) {
                //之前尾节点的下个节点指向当前节点
                pred.next = node;
                return node;
            }
        }
        //如果添加失败或队列不存在,执行end函数
        enq(node);
        return node;
    }

添加节点的时候,如果从CLH队列已经存在,通过CAS快速将当前节点添加到队列尾部。

如果添加失败或队列不存在,则指向enq函数自旋入队。

  • 通过自旋CAS尝试往队列尾部插入节点,直到成功。
  • 自旋过程如果发现CLH队列不存在时会初始化CLH队列。
    private Node enq(final Node node) {
        for (;;) { //循环
            //获取尾节点
            Node t = tail;
            if (t == null) {
                //如果尾节点为空,创建哨兵节点,通过cas把头节点指向哨兵节点
                if (compareAndSetHead(new Node()))
                    //cas成功,尾节点指向哨兵节点
                    tail = head;
            } else {
                //当前节点的前驱节点设指向之前尾节点
                node.prev = t;
                //cas设置把尾节点指向当前节点
                if (compareAndSetTail(t, node)) {
                    //cas成功,之前尾节点的下个节点指向当前节点
                    t.next = node;
                    return t;
                }
            }
        }
    }

条件变量

Objectwait、notify函数是配合Synchronized锁实现线程间同步协作的功能。

AQSConditionObject条件变量,则通过ConditionObjectawaitsignal两类函数完成。

  • 不同于Synchronized锁,一个AQS可以对应多个条件变量,而Synchronized只有一个。

ConditionObject内部维护着一个单向条件队列,不同于CLH队列。

条件队列只入队执行await的线程节点,并且加入条件队列的节点。

  • 不能在CLH队列, 条件队列出队的节点,会入队到CLH队列。

当某个线程执行了ConditionObjectawait函数,阻塞当前线程。

线程会被封装成Node节点添加到条件队列的末端,其他线程执行ConditionObjectsignal函数。

  • 会将条件队列头部线程节点转移到CLH队列参与竞争资源。

image-20231018142940365

image-20231018192059833

AQS采用了模板方法设计模式,提供了两类模板:

  • 一类是独占式模板,另一类是共享形模式。

独占式获取资源

acquire模板函数,模板流程:

  • 线程获取共享资源,如果获取资源成功,线程直接返回。
  • 否则进入CLH队列,直到获取资源成功为止,且整个过程忽略中断的影响。

执行tryAcquire函数,tryAcquire是由子类实现,代表获取资源是否成功。

  • 如果资源获取失败,执行下面的逻辑。

执行addWaiter函数,根据当前线程创建出独占式节点,并入队CLH队列。

执行acquireQueued函数,自旋阻塞等待获取资源。

如果acquireQueued函数中获取资源成功。

  • 根据线程是否被中断状态,来决定执行线程中断逻辑。

image-20231018192408391

    final boolean acquireQueued(final Node node, int arg) {
        //异常状态,默认是
        boolean failed = true;
        try {
            //该线程是否中断过,默认否
            boolean interrupted = false;
            for (;;) {//自旋
                //获取前驱节点
                final Node p = node.predecessor();
                //如果前驱节点是首节点,获取资源(子类实现)
                if (p == head && tryAcquire(arg)) {
                    //获取资源成功,设置当前节点为头节点,清空当前节点的信息,把当前节点变成哨兵节点
                    setHead(node);
                    //原来首节点下个节点指向为null
                    p.next = null; // help GC
                    //非异常状态,防止指向finally逻辑
                    failed = false;
                    //返回线程中断状态
                    return interrupted;
                }
                /**
                 * 如果前驱节点不是首节点,先执行shouldParkAfterFailedAcquire函数,shouldParkAfterFailedAcquire做了三件事
                 * 1.如果前驱节点的等待状态是SIGNAL,返回true,执行parkAndCheckInterrupt函数,返回false
                 * 2.如果前驱节点的等大状态是CANCELLED,把CANCELLED节点全部移出队列(条件节点)
                 * 3.以上两者都不符合,更新前驱节点的等待状态为SIGNAL,返回false
                 */
                if (shouldParkAfterFailedAcquire(p, node) &&
                    //使用LockSupport类的静态方法park挂起当前线程,直到被唤醒,唤醒后检查当前线程是否被中断,返回该线程中断状态并重置中断状态
                    parkAndCheckInterrupt())
                    //该线程被中断过
                    interrupted = true;
                }
            } finally {
                // 尝试获取资源失败并执行异常,取消请求,将当前节点从队列中移除
                if (failed)
                    cancelAcquire(node);
            }
    }

独占式释放资源

AQS中提供了release模板函数来释放资源,模板流程就是线程释放资源成功。

  • 唤醒CLH队列的第二个线程节点(首节点的下个节点)。
    public final boolean release(int arg) {

        if (tryRelease(arg)) {//释放资源成功,tryRelease子类实现
            //获取头部线程节点
            Node h = head;
            if (h != null && h.waitStatus != 0) //头部线程节点不为null,并且等待状态不为0
                //唤醒CHL队列第二个线程节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    
    private void unparkSuccessor(Node node) {
        //获取节点等待状态
        int ws = node.waitStatus;
        if (ws < 0)
            //cas更新节点状态为0
            compareAndSetWaitStatus(node, ws, 0);
    
        //获取下个线程节点        
        Node s = node.next;
        if (s == null || s.waitStatus > 0) { //如果下个节点信息异常,从尾节点循环向前获取到正常的节点为止,正常情况不会执行
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //唤醒线程节点
            LockSupport.unpark(s.thread);
        }
    }

共享式获取资源

acquireShared模板流程就是线程获取共享资源,如果获取到资源,线程直接返回。

  • 否则进入CLH队列,直到获取到资源为止,且整个过程忽略中断的影响。
    public final void acquireShared(int arg) {
        /**
         * 1.负数表示失败
         * 2.0表示成功,但没有剩余可用资源
         * 3.正数表示成功且有剩余资源
         */
        if (tryAcquireShared(arg) < 0) //获取资源失败,tryAcquireShared子类实现
            //自旋阻塞等待获取资源
            doAcquireShared(arg);
    }

共享式释放资源

AQS中提供了releaseShared模板函数来释放资源,模板流程就是线程释放资源成功。

  • 唤醒CHL队列的第二个线程节点(首节点的下个节点)。
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {//释放资源成功,tryReleaseShared子类实现
            //唤醒后继节点
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    private void doReleaseShared() {
        for (;;) {
            //获取头节点
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
    
                if (ws == Node.SIGNAL) {//如果头节点等待状态为SIGNAL
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//更新头节点等待状态为0
                        continue;            // loop to recheck cases
                    //唤醒头节点下个线程节点
                    unparkSuccessor(h);
                }
                //如果后继节点暂时不需要被唤醒,更新头节点等待状态为PROPAGATE
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;               
            }
            if (h == head)              
                break;
        }
    }

ReentrantLock

ReentrantLock是可重入的互斥锁。

ReentrantLock底层基于AbstractQueuedSynchronizer实现。

整体结构

ReentrantLock内部定义了专门的组件SyncSync继承AbstractQueuedSynchronizer提供释放资源的实现,NonfairSyncFairSync是基于Sync扩展的子类,即ReentrantLock的非公平模式与公平模式,它们作为Lock接口功能的基本实现。

image-20231017180401150

ReentrantLock中,它对AbstractQueuedSynchronizerstate状态值定义为线程获取该锁的重入次数,state状态值为0表示当前没有被任何线程持有,state状态值为1表示被其他线程持有,因为支持可重入,如果是持有锁的线程,再次获取同一把锁,直接成功,并且state状态值+1,线程释放锁state状态值-1,同理重入多次锁的线程,需要释放相应的次数。

Sync

Sync继承了AbstractQueuedSynchronizer,是ReentrantLock的核心,NonfairSyncFairSync都是基于Sync扩展出来的子类。

Sync实现了AQS类的释放资源(tryRelease),然后抽象了一个获取锁的函数(lock)让子类自行实现。


abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * 获取锁-子类实现
         */
        abstract void lock();

        /**
         * 非公平-获取资源
         */
        final boolean nonfairTryAcquire(int acquires) {
            //获取当前线程
            final Thread current = Thread.currentThread();
            //获取当前状态
            int c = getState();
            if (c == 0) { // state==0 代表资源可获取
                //cas设置state为acquires,acquires传入的是1
                if (compareAndSetState(0, acquires)) {
                    //cas成功,设置当前持有锁的线程
                    setExclusiveOwnerThread(current);
                    //返回成功
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) { //如果state!=0,但是当前线程是持有锁线程,直接重入
                //state状态+1
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                //设置state状态,此处不需要cas,因为持有锁的线程只有一个    
                setState(nextc);
                //返回成功
                return true;
            }
            //返回失败
            return false;
        }
        
        /**
         * 释放资源
         */
        protected final boolean tryRelease(int releases) {
            //state状态-releases,releases传入的是1
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread()) //如果当前线程不是持有锁线程,抛出异常
                throw new IllegalMonitorStateException();
            //设置返回状态,默认为失败
            boolean free = false;
            if (c == 0) {//state-1后,如果c==0代表释放资源成功
                //返回状态设置为true
                free = true;
                //清空持有锁线程
                setExclusiveOwnerThread(null);
            }
            //如果state-1后,state还是>0,代表当前线程有锁重入操作,需要做相应的释放次数,设置state值
            setState(c);
            return free;
        }
}        

tryRelease流程:

image-20231017181031803

NonfairSync

ReentrantLock中支持两种获取锁的策略,分别是非公平策略与公平策略,NonfairSync就是非公平策略。

NonfairSync继承Sync实现了lock函数,CAS设置状态值state1代表获取锁成功,否则执行AQSacquire函数(获取锁模板)。

另外NonfairSync还实现了AQS留给子类实现的tryAcquire函数(获取资源),直接使用Sync提供的nonfairTryAcquire函数来实现tryAcquire,最后子类实现的tryAcquire函数在AQSacquire函数中被使用。

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * 获取锁
         */
        final void lock() {
            if (compareAndSetState(0, 1))//cas设置state为1成功,代表获取资源成功    
                //资源获取成功,设置当前线程为持有锁线程
                setExclusiveOwnerThread(Thread.currentThread());
            else
                //cas设置state为1失败,代表获取资源失败,执行AQS获取锁模板流程,否获取资源成功
                acquire(1);
        }
        
        /**
         * 获取资源-使用的是Sync提供的nonfairTryAcquire函数
         */
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    
    /**
     * AQS获取锁模板函数,这是AQS类中的函数
     */
    public final void acquire(int arg) {
        /**
         * 我们只需要关注tryAcquire函数,后面的函数是AQS获取资源失败,线程节点进入CLH队列的细节流程,本文不关注
         */
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

当前线程查看资源是否可获取:

可获取,尝试使用CAS设置state1CAS成功代表获取资源成功,否则获取资源失败。

不可获取,判断当线程是不是持有锁的线程,如果是,state重入计数,获取资源成功,否则获取资源失败。

image-20231017181634485

    /**
     * 非公平-获取资源
     */
    final boolean nonfairTryAcquire(int acquires) {
        //获取当前线程
        final Thread current = Thread.currentThread();
        //获取当前状态
        int c = getState();
        if (c == 0) { // state==0 代表资源可获取
            //cas设置state为acquires,acquires传入的是1
            if (compareAndSetState(0, acquires)) {
                //cas成功,设置当前持有锁的线程
                setExclusiveOwnerThread(current);
                //返回成功
                return true;
            }
        }
        //如果state!=0,但是当前线程是持有锁线程,直接重入
        else if (current == getExclusiveOwnerThread()) {
            //state状态+1
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            //设置state状态,此处不需要cas,因为持有锁的线程只有一个    
            setState(nextc);
            //返回成功
            return true;
        }
        //返回失败
        return false;
    }

FairSync

FairSync流程与NonfairSync基本一致,唯一的区别就是在CAS执行前,多了一步hasQueuedPredecessors函数,这一步就是判断当前线程是不是CLH队列被唤醒的线程,如果是就执行CAS,否则获取资源失败。

image-20231017181804586


static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
        
        /**
         * 获取锁
         */
        final void lock() {
        //cas设置state为1失败,代表获取资源失败,执行AQS获取锁模板流程,否获取资源成功
            acquire(1);
        }

        /**
         * 获取资源
         */
        protected final boolean tryAcquire(int acquires) {
            //获取当前线程
            final Thread current = Thread.currentThread();
            //获取state状态
            int c = getState();
            if (c == 0) { // state==0 代表资源可获取
                //1.hasQueuedPredecessors判断当前线程是不是CLH队列被唤醒的线程,如果是执行下一个步骤
               //2.cas设置state为acquires,acquires传入的是1
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    //cas成功,设置当前持有锁的线程
                    setExclusiveOwnerThread(current);
                    //返回成功
                    return true;
                }
            }
            //如果state!=0,但是当前线程是持有锁线程,直接重入
            else if (current == getExclusiveOwnerThread()) {
                //state状态+1
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                //设置state状态,此处不需要cas,因为持有锁的线程只有一个 
                setState(nextc);
                //返回成功
                return true;
            }
            return false;
        }
    }

    /**
     * AQS获取锁模板函数,这是AQS类中的函数
     */
    public final void acquire(int arg) {
        /**
         * 我们只需要关注tryAcquire函数,后面的函数是AQS获取资源失败,线程节点进入CLH队列的细节流程,本文不关注
         */
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

Lock的实现

    //同步器
    private final Sync sync;
    
    //默认使用非公平策略
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    //true-公平策略 false非公平策略
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    //同步器
    private final Sync sync;

    //默认使用非公平策略
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    //true-公平策略 false非公平策略
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

    /**
     * 获取锁-阻塞
     */
    public void lock() {
        //基于sync实现
        sync.lock();
    }

    /**
     * 获取锁-阻塞,支持响应线程中断
     */
    public void lockInterruptibly() throws InterruptedException {
        //基于sync实现
        sync.acquireInterruptibly(1);
    }

    /**
     * 获取资源,返回是否成功状态-非阻塞
     */
    public boolean tryLock() {
        //基于sync实现
        return sync.nonfairTryAcquire(1);
    }

    /**
     * 获取锁-阻塞,支持超时 
     */
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        //基于sync实现    
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    /**
     * 释放锁
     */
    public void unlock() {
        //基于sync实现
        sync.release(1);
    }

    /**
     * 创建条件变量
     */
    public Condition newCondition() {
        //基于sync实现
        return sync.newCondition();
    }

}

ConcurrentHashMap

ConcurrentHashMap 是一个存储 key/value 对的容器,并且是线程安全的。

image-20231017182254278

ConcurrentHashMap和HashMap:

相同之处:

数组、链表结构几乎相同,所以底层对数据结构的操作思路是相同的(只是思路相同,底层实现不同)。

都实现了 Map 接口,继承了 AbstractMap 抽象类,所以大多数的方法也都是相同的,HashMap 有的方法,ConcurrentHashMap 几乎都有,所以当我们需要从 HashMap 切换到 ConcurrentHashMap 时,无需关心两者之间的兼容问题。

不同之处:

红黑树结构略有不同,HashMap 的红黑树中的节点叫做 TreeNode,TreeNode 不仅仅有属性,还维护着红黑树的结构,比如说查找,新增等等;ConcurrentHashMap 中红黑树被拆分成两块,TreeNode 仅仅维护的属性和查找功能,新增了 TreeBin,来维护红黑树结构,并负责根节点的加锁和解锁。

新增 ForwardingNode (转移)节点,扩容的时候会使用到,通过使用该节点,来保证扩容时的线程安全。

重要属性

//这个Node数组就是ConcurrentHashMap用来存储数据的哈希表
transient volatile Node[] table
//这是默认的初始化哈希表数组大小
private static final int DEFAULT_CAPACITY = 16;
//转化为红黑树的链表长度阈值
static final int TREEIFY_THRESHOLD = 8
//这个标识位用于识别扩容时正在转移数据
static final int MOVED = -1
//计算哈希值时用到的参数,用来去除符号位
static final int HASH_BITS = 0x7fffffff;
//数据转移时,新的哈希表数组
private transient volatile Node[] nextTable;

Node:

链表中的元素为Node对象。

链表上的一个节点,内部存储了key、value值,以及他的下一个节点的引用,这样一系列的Node就组成一个链表。

ForwardingNode:

当进行扩容时,要把链表迁移到新的哈希表,在做这个操作时,会在把数组中的头节点替换为ForwardingNode对象。

ForwardingNode中不保存key和value,只保存了扩容后哈希表(nextTable)的引用。

此时查找相应node时,需要去nextTable中查找。

TreeBin

当链表转为红黑树后,数组中保存的引用为 TreeBin,TreeBin 内部不保存 key/value,他保存了 TreeNode的list以及红黑树 root。

TreeNode

红黑树的节点。

Put方法

1、如果数组为空,初始化,初始化完成之后,走 2。

2、计算当前槽点有没有值,没有值的话,cas 创建,失败继续自旋(for 死循环),直到成功,槽点有值的话,走 3。

3、如果槽点是转移节点(正在扩容),就会一直自旋等待扩容完成之后再新增,不是转移节点走 4。

4、槽点有值的,先锁定当前槽点,保证其余线程不能操作,如果是链表,新增值到链表的尾部,如果是红黑树,使用红黑树新增的方法新增;

5、新增完成之后 check 需不需要扩容,需要的话去扩容。

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    //计算hash
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //table是空的,进行初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        //如果当前索引位置没有值,直接创建
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            //cas 在 i 位置创建新的元素,当 i 位置是空时,即能创建成功,结束for自循,否则继续自旋
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //如果当前槽点是转移节点,表示该槽点正在扩容,就会一直等待扩容完成
        //转移节点的 hash 值是固定的,都是 MOVED
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        //槽点上有值的
        else {
            V oldVal = null;
            //锁定当前槽点,其余线程不能操作,保证了安全
            synchronized (f) {
                //这里再次判断 i 索引位置的数据没有被修改
                //binCount 被赋值的话,说明走到了修改表的过程里面
                if (tabAt(tab, i) == f) {
                    //链表
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            //值有的话,直接返回
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            //把新增的元素赋值到链表的最后,退出自旋
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    //红黑树,这里没有使用 TreeNode,使用的是 TreeBin,TreeNode 只是红黑树的一个节点
                    //TreeBin 持有红黑树的引用,并且会对其加锁,保证其操作的线程安全
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        //满足if的话,把老的值给oldVal
                        //在putTreeVal方法里面,在给红黑树重新着色旋转的时候
                        //会锁住红黑树的根节点
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            //binCount不为空,并且 oldVal 有值的情况,说明已经新增成功了
            if (binCount != 0) {
                // 链表是否需要转化成红黑树
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                //这一步几乎走不到。槽点已经上锁,只有在红黑树或者链表新增失败的时候
                //才会走到这里,这两者新增都是自旋的,几乎不会失败
                break;
            }
        }
    }
    //check 容器是否需要扩容,如果需要去扩容,调用 transfer 方法去扩容
    //如果已经在扩容中了,check有无完成
    addCount(1L, binCount);
    return null;
}

Get方法

先获取数组的下标,然后通过判断数组下标的 key 是否和我们的 key 相等,相等的话直接返回,如果下标的槽点是链表或红黑树的话,分别调用相应的查找数据的方法,整体思路和 HashMap 很像。

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    //计算hashcode
    int h = spread(key.hashCode());
    //不是空的数组 && 并且当前索引的槽点数据不是空的
    //否则该key对应的值不存在,返回null
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        //槽点第一个值和key相等,直接返回
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        //如果是红黑树或者转移节点,使用对应的find方法
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        //如果是链表,遍历查找
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

扩容

扩容的方法叫做 transfer,transfer 方法的主要思路是:

  • 首先需要把老数组的值全部拷贝到扩容之后的新数组上,先从数组的队尾开始拷贝。

  • 拷贝数组的槽点时,先把原数组槽点锁住,保证原数组槽点不能操作,成功拷贝到新数组时,把原数组槽点赋值为转移节点。

  • 这时如果有新数据正好需要 put 到此槽点时,发现槽点为转移节点,就会一直等待,所以在扩容完成之前,该槽点对应的数据是不会发生变化的。

  • 从数组的尾部拷贝到头部,每拷贝成功一次,就把原数组中的节点设置成转移节点。

  • 直到所有数组数据都拷贝到新数组时,直接把新数组整个赋值给数组容器,拷贝完成。

扩容方法主要是通过在原数组上设置转移节点,put 时碰到转移节点时会等待扩容成功之后才能 put 的策略,来保证了整个扩容过程中肯定是线程安全的,因为数组的槽点一旦被设置成转移节点,在没有扩容完成之前,是无法进行操作的。

// 扩容主要分 2 步,第一新建新的空数组,第二移动拷贝每个元素到新数组中去
// tab:原数组,nextTab:新数组
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    // 老数组的长度
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    // 如果新数组为空,初始化,大小为原数组的两倍,n << 1
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    // 新数组的长度
    int nextn = nextTab.length;
    // 代表转移节点,如果原数组上是转移节点,说明该节点正在被扩容
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    // 无限自旋,i 的值会从原数组的最大值开始,慢慢递减到 0
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            // 结束循环的标志
            if (--i >= bound || finishing)
                advance = false;
            // 已经拷贝完成
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            // 每次减少 i 的值
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        // if 任意条件满足说明拷贝结束了
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // 拷贝结束,直接赋值,因为每次拷贝完一个节点,都在原数组上放转移节点,所以拷贝完成的节点的数据一定不会再发生变化。
            // 原数组发现是转移节点,是不会操作的,会一直等待转移节点消失之后在进行操作。
            // 也就是说数组节点一旦被标记为转移节点,是不会再发生任何变动的,所以不会有任何线程安全的问题
            // 所以此处直接赋值,没有任何问题。
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            synchronized (f) {
                // 进行节点的拷贝
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    if (fh >= 0) {
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        // 如果节点只有单个数据,直接拷贝,如果是链表,循环多次组成链表拷贝
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // 在新数组位置上放置拷贝的值
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        // 在老数组位置上放上 ForwardingNode 节点
                        // put 时,发现是 ForwardingNode 节点,就不会再动这个节点的数据了
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    // 红黑树的拷贝
                    else if (f instanceof TreeBin) {
                        // 红黑树的拷贝工作,同 HashMap 的内容,代码忽略
                        …………
                        // 在老数组位置上放上 ForwardingNode 节点
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

如何保证线程安全

数组初始化时的线程安全:

数组初始化时,首先通过自旋来保证一定可以初始化成功,然后通过 CAS 设置 SIZECTL 变量的值,来保证同一时刻只能有一个线程对数组进行初始化,CAS 成功之后,还会再次判断当前数组是否已经初始化完成,如果已经初始化完成,就不会再次初始化,通过自旋 + CAS + 双重 check 等手段保证了数组初始化时的线程安全。

//初始化 table,通过对 sizeCtl 的变量赋值来保证数组只能被初始化一次
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    //通过自旋保证初始化成功
    while ((tab = table) == null || tab.length == 0) {
        // 小于 0 代表有线程正在初始化,释放当前 CPU 的调度权,重新发起锁的竞争
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        // CAS 赋值保证当前只有一个线程在初始化,-1 代表当前只有一个线程能初始化
        // 保证了数组的初始化的安全性
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                // 很有可能执行到这里的时候,table 已经不为空了,这里是双重 check
                if ((tab = table) == null || tab.length == 0) {
                    // 进行初始化
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

新增槽点值时的线程安全:

通过自旋死循环保证一定可以新增成功。

当前槽点为空时,通过 CAS 新增。

当前槽点有值,锁住当前槽点。

  • put 时,如果当前槽点有值,就是 key 的 hash 冲突的情况,此时槽点上可能是链表或红黑树,我们通过锁住槽点,来保证同一时刻只会有一个线程能对槽点进行修改。

ConcurrentHashMap在JDK1.7和JDK1.8的不同

数据结构:

  • Java 7:采用 Segment 分段锁来实现。
  • Java 8:使用数组 + 链表 + 红黑树。

并发度:

  • Java 7:每个 Segment 独立加锁,最大并发个数就是 Segment 的个数,默认是 16。
  • Java 8 中:锁粒度更细,理想情况下 table 数组元素的个数(数组长度)就是其支持并发的最大个数,并发度比之前有提高。

保证并发安全的原理:

  • Java 7:采用 Segment 分段锁来保证安全,Segment 是继承自 ReentrantLock。
  • Java 8:采用 Node + CAS + Synchronized 保证线程安全。

遇到 Hash 碰撞:

  • Java 7:在 Hash 冲突时,会使用拉链法。

  • Java 8:先使用拉链法,在链表长度超过一定阈值时,将链表转换为红黑树,来提高查找效率。

查询时间复杂度:

  • Java 7:遍历链表的时间复杂度是 O(n),n 为链表长度。

  • Java 8:如果变成遍历红黑树,那么时间复杂度降低为 O(log(n)),n 为树的节点个数。

锁机制

锁分类

偏向锁/轻量级锁/重量级锁:

偏向锁:

  • 一个对象被初始化后,还没有任何线程来获取它的锁时,那么它就是可偏向的。
  • 当有第一个线程来访问它并尝试获取锁的时候,它就将这个线程记录下来。
    • 以后如果尝试获取锁的线程正是偏向锁的拥有者,就可以直接获得锁,开销很小,性能最好。

轻量级锁:

  • 当锁原来是偏向锁的时候,被另一个线程访问,说明存在竞争,那么偏向锁就会升级为轻量级锁。
    • 线程会通过自旋的形式尝试获取锁,而不会陷入阻塞。

重量级锁:

  • 重量级锁是互斥锁,它是利用操作系统的同步机制实现的,所以开销相对比较大。

  • 当多个线程直接有实际竞争,且锁竞争时间长的时候,轻量级锁不能满足需求,锁就会膨胀为重量级锁。

  • 重量级锁会让其他申请却拿不到锁的线程进入阻塞状态。

性能对比:

  • 偏向锁性能最好,可以避免执行 CAS 操作。

  • 轻量级锁利用自旋和 CAS 避免了重量级锁带来的线程阻塞和唤醒,性能中等。

  • 重量级锁则会把获取不到锁的线程阻塞,性能最差。

可重入锁/非可重入锁:

可重入锁:

  • 线程当前已经持有这把锁了,能在不释放这把锁的情况下,再次获取这把锁。

不可重入锁:

  • 虽然线程当前持有了这把锁,但是如果想再次获取这把锁,也必须要先释放锁后才能再次尝试获取。

共享锁/独占锁:

共享锁:

  • 同一把锁可以被多个线程同时获得。

独占锁:

  • 这把锁只能同时被一个线程获得。

公平锁/非公平锁:

公平锁:

  • 如果线程现在拿不到这把锁,那么线程就都会进入等待,开始排队。
    • 在等待队列里等待时间长的线程会优先拿到这把锁。

非公平锁:

  • 它会在一定情况下,忽略掉已经在排队的线程,发生插队现象。
    • 整体执行速度更快,吞吐量更大,但同时也可能产生线程饥饿问题。

悲观锁/乐观锁:

悲观锁:

  • 在获取资源之前,必须先拿到锁,以便达到 独占 的状态。

    • 当前线程在操作资源的时候,其他线程由于不能拿到锁。
  • 适合用于并发写入多、临界区代码复杂、竞争激烈等场景。

    • 这种场景下悲观锁可以避免大量的无用的反复尝试等消耗。

乐观锁:

  • 它并不要求在获取资源前拿到锁,也不会锁住资源。

  • 乐观锁利用 CAS 理念,在不独占资源的情况下,完成了对资源的修改。

  • 适用于大部分是读取,少部分是修改的场景,也适合虽然读写都很多,但是并发并不激烈的场景。

自旋锁/非自旋锁:

自旋锁:

  • 如果线程现在拿不到锁,并不直接陷入阻塞或者释放 CPU 资源,而是开始利用循环,不停地尝试获取锁。
    • 优点是减少线程上下文切换的消耗,缺点是循环会消耗CPU。

非自旋锁:

  • 如果拿不到锁就直接放弃,或者进行其他的处理逻辑,例如去排队、陷入阻塞等。

可中断锁/不可中断锁:

不可中断锁:

  • 一旦线程申请了锁,只能等到拿到锁以后才能进行其他的逻辑处理。

可中断锁:

  • 在获取锁的过程中,突然不想获取了,那么也可以在中断之后去做其他的事情,不需要一直等到获取到锁才离开。

锁升级

img
锁升级细化流程:

img

锁优化

自适应的自旋锁:

自旋的时间不再固定,而是会根据最近自旋尝试的成功率、失败率

  • 以及当前锁的拥有者的状态等多种因素来共同决定。

  • 如果最近尝试自旋获取某一把锁成功了,那么下一次可能还会继续使用自旋,并且允许自旋更长的时间。

  • 但是如果最近自旋获取某一把锁失败了,那么可能会省略掉自旋的过程,以便减少无用的自旋,提高效率。

锁消除:

@Override
public synchronized StringBuffer append(Object obj) {
    toStringCache = null;
    super.append(String.valueOf(obj));
    return this;
}

如果编译器能确定这个 StringBuffer 对象只会在一个线程内被使用,就代表肯定是线程安全的

那么我们的编译器便会做出优化,把对应的 Synchronized 给消除

  • 省去加锁和解锁的操作,以便增加整体的效率。

锁粗化:

public void lockCoarsening() {
    synchronized (this) {
        //do something
    }
    synchronized (this) {
        //do something
    }
    synchronized (this) {
        //do something
    }
}

把同步区域扩大,也就是只在最开始加一次锁,并且在最后直接解锁

  • 那么就可以把中间这些无意义的解锁和加锁的过程消除。

  • 锁粗化不适用于循环的场景,仅适用于非循环的场景。

死锁

死锁就是两个或多个线程(或进程)被无限期地阻塞,相互等待对方手中资源的一种状态。

/**
 * 描述:     必定死锁的情况
 */
public class MustDeadLock implements Runnable {

    public int flag;
    static Object o1 = new Object();
    static Object o2 = new Object();

    public void run() {
        System.out.println("线程"+Thread.currentThread().getName() + "的flag为" + flag);
        if (flag == 1) {
            synchronized (o1) {
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                synchronized (o2) {
                    System.out.println("线程1获得了两把锁");
                }
            }
        }
        if (flag == 2) {
            synchronized (o2) {
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                synchronized (o1) {
                    System.out.println("线程2获得了两把锁");
                }
            }
        }
    }

    public static void main(String[] argv) {
        MustDeadLock r1 = new MustDeadLock();
        MustDeadLock r2 = new MustDeadLock();
        r1.flag = 1;
        r2.flag = 2;
        Thread t1 = new Thread(r1, "t1");
        Thread t2 = new Thread(r2, "t2");
        t1.start();
        t2.start();
    }
}

排查死锁:

通过jdk工具jps、jstack排查死锁问题。

jps:jdk提供的一个工具,可以查看到正在运行的java进程。

jstack:jdk提供的一个工具,可以查看java进程中线程堆栈信息。

堆栈信息中我们可以发现这个内容:Found one Java-level deadlock,表示程序中发现了一个死锁。

通过jdk提供的工具jconsole排查死锁问题。

jconsole:jdk提供的一个可视化的工具,方便排查程序的一些问题,如:程序内存溢出、死锁问题等等。

通过jdk提供的工具VisualVM排查死锁问题。

VisualVM:jdk提供的一个排查java程序问题的一个工具,可以监控程序的性能、查看jvm配置信息、堆快照、线程堆栈信息。

如何避免死锁

正确的顺序获得锁。

  • 死锁的根本原因就是获取锁的顺序是乱序的。

超时放弃。

  • 当线程获取锁超时了则放弃。

Synchronized

Synchronized可以保证在同一个时刻,只有一个线程可以执行某个方法或者某个代码块。

两类锁:

对象锁:

  • 方法锁(默认锁对象为this当前实例对象)和同步代码块锁(自己指定锁对象)。

类锁:

  • 修饰静态的方法或指定锁为Class对象。

底层实现

修饰方法:

  • 在字节码上给方法加了一个 flag:ACC_SYNCHRONIZED

代码块:

  • 通过 monitorenter 和monitorexit 两个指令进行控制的。
    • 本质上是通过 monitor 来实现的。

在Java虚拟机(HotSpot)中,Monitor是由ObjectMonitor实现的,其主要数据结构如下:

ObjectMonitor() {
    _header       = NULL; //markOop对象头
    _count        = 0; //记录个数
    _waiters      = 0, //等待线程数
    _recursions   = 0; //重入次数
    _object       = NULL; //监视器锁寄生的对象。锁不是平白出现的,而是寄托存储于对象中。
    _owner        = NULL;  //指向获得ObjectMonitor对象的线程或基础锁
    _WaitSet      = NULL; //处于wait状态的线程,会被加入到_WaitSet
    _WaitSetLock  = 0 ; 
    _Responsible  = NULL;
    _succ         = NULL;
    _cxq          = NULL;
    FreeNext      = NULL;
    _EntryList    = NULL; //处于等待锁block状态的线程,会被加入到该列表
    _SpinFreq     = 0 ;
    _SpinClock    = 0 ;
    OwnerIsThread = 0 ; // _owner is (Thread *) vs SP/BasicLock
    _previous_owner_tid = 0; // 监视器前一个拥有者线程的ID
}

每个 Java 对象在 JVM 的对等对象的头中保存锁状态,指向 ObjectMonitor。

  • ObjectMonitor 保存了当前持有锁的线程引用
  • EntryList 中保存目前等待获取锁的线程
  • WaitSet 保存 wait 的线程。

计数器count:

  • 每当线程获得 monitor 锁,计数器 +1,当线程重入此锁时,计数器还会 +1。
  • 当计数器不为 0 时
    • 其它尝试获取 monitor 锁的线程将会被保存到EntryList中,并被阻塞。
  • 当持有锁的线程释放了monitor 锁后,计数器 -1。
  • 当计数器归位为 0 时,所有 EntryList 中的线程会尝试去获取锁
    • 但只会有一个线程会成功,没有成功的线程仍旧保存在 EntryList 中。
image-20231017173210058

加锁时,即遇到Synchronized关键字时

  • 线程会先进入monitor的_EntryList队列阻塞等待。

如果monitor的_owner为空,则从队列中移出并赋值与_owner

如果在程序里调用了wait()方法,wait方法会释放monitor锁

  • _owner赋值为null,并进入_WaitSet队列阻塞等待。

  • 这时其他在_EntryList中的线程就可以获取锁了。

当程序里其他线程调用了notify/notifyAll方法时

  • 就会唤醒_WaitSet中的某个线程,这个线程就会再次尝试获取monitor锁。

  • 如果成功,则就会成为monitor的owner。

当程序里遇到Synchronized关键字的作用范围结束时,就会将monitor的owner设为null,退出。

Synchronized和Lock的区别:

Synchronized属于JVM层面

  • Lock是API层面的东西,JUC提供的具体类。

Synchronized不需要用户手动释放锁

  • 当代码执行完毕之后会自动让线程释放持有的锁,Lock需要去手动释放锁。

Synchronized是不可中断的

  • 除非抛出异常或者程序正常退出,Lock可中断。

Synchronized是非公平锁

  • Lock默认是非公平锁,但是可以通过构造函数传入boolean类型值更改是否为公平锁。

Synchronized要么唤醒所有线程,要么随机唤醒一个线程

  • Lock可以使用condition实现分组唤醒需要唤醒的线程。

Synchronized只能同时被一个线程拥有

  • 但是 Lock 锁没有这个限制,例如在读写锁中的读锁,是可以同时被多个线程持有的。
支付宝打赏 微信打赏

如果文章对你有帮助,欢迎点击上方按钮打赏作者!