书籍介绍:https://book.douban.com/subject/26591326/
并发编程挑战
并发编程的目的是为了让程序运行得更快,但是,并不是启动更多的线程就能让程序最大限度地并发执行
- 在进行并发编程时,如果希望通过多线程执行任务让程序运动得更快,会面临非常多的挑战
- 比如上下文切换的问题、死锁的问题,以及受限于硬件和软件的资源限制问题
挑战一:上下文切换
多线程一定比单线程快么?
public class ConcurrencyTest {
private static final long count = 10001;
public static void main(String[] args) throws InterruptedException {
concurrency();
serial();
}
private static void concurrency() throws InterruptedException{
Long start = System.currentTimeMillis();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
int a = 0;
for (long i = 0; i < count; i++) {
a += 5;
}
}
});
thread.start();
int b = 0;
for (long i = 0; i < count; i++) {
b--;
}
thread.join();
long time = System.currentTimeMillis() - start;
System.out.println("concurrency:" + time + "ms, b = " + b);
}
private static void serial(){
long start = System.currentTimeMillis();
int a = 0;
for (long i = 0; i < count; i++) {
a += 5;
}
int b = 0;
for (long i = 0; i < count; i++) {
b--;
}
long time = System.currentTimeMillis() - start;
System.out.println("serial:" + time + "ms, b = " + b + ", a=" + a);
}
}
答案是并不一定,当测试量达到一百万的时候,并发才能比串行优势点(本代码环境结果)
- 线程创建和上下文切换都是需要开销的
如何减少上下文的切换?
无锁并发编程:
- 可以使用一些方法避免使用锁,如将数据的ID按照Hash算法取模分段,不同的线程处理不同段的数据
CAS算法:
- Java的
Atomic
包使用CAS
算法更新数据,而不需要加锁使用最少线程:
- 避免创建不需要的线程
协程:
- 在单线程里实现多任务的调度,并在单线程里维持多个任务间的切换
挑战二:死锁
public class DeadLockDemo {
private static String A = "A";
private static String B = "B";
public static void main(String[] args) {
new DeadLockDemo().deadLock();
}
private void deadLock(){
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (A){
try {
Thread.sleep(2000);
} catch (InterruptedException e){
e.printStackTrace();
}
synchronized (B){
System.out.println("1");
}
}
}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (B){
synchronized (A){
System.out.println("2");
}
}
}
});
thread1.start();
thread2.start();
}
}
避免死锁的几个常见方法:
- 避免一个线程同时获取多个锁
- 避免一个线程在锁内同时占用多个资源,尽量保证每个锁只占一个资源
- 尝试使用定时锁,使用
lock.tryLock(timeout)
来替代使用内部锁机制- 对于数据库锁,加锁和解锁必须在一个数据库连接里
挑战三:资源限制的挑战
什么是资源限制?
指在进行并发编程时,程序的执行速度受限于计算机硬件资源或软件资源
资源限制引发的问题?
在并发编程中,将代码执行速度加快的原则是将代码中串行的部分变成并发执行
- 但是如果将某段串行的代码并发执行,因为受限于资源,仍然在串行执行
- 这时候程序不仅不会加快执行,反而会更慢,因为增加了上下文切换和资源调度的时间
如何解决资源限制的问题?
对于硬件限制,可以考虑集群并行执行程序
既然单机的资源有限制,就让程序在多机上运行
比如使用
ODPS、Hadoop
或者自己搭建服务器集群对于软件限制,可以考虑使用资源池将资源复用
比如使用连接池将数据库和Socket连接复用,或者在调用对方
webService
接口获取数据时,只建立一个连接
如何在资源限制的情况下进行并发编程?
根据不同的资源限制调整程序的并发度
- 比如下载文件程序依赖于两个资源——宽带和硬盘读写速度
有数据库操作时,设计数据库连接数,如果SQL语句执行非常快
- 而线程的数量比数据库连接数大很多,则某些线程会被阻塞,等待数据库连接
底层实现原理
Java代码在编译后 编程Java字节码,字节码被类加载器加载到JVM里
JVM
执行字节码,最终需要转化为汇编指令在CPU
上执行,Java中所使用的并发机制依赖于JVM的实现和CPU的指令
volatiled的应用
volatile是轻量级的
synchronized
,它在多处理器开发中保证了共享变量的可见性
- 可见性的意思是一个线程修改一个共享变量时,其他线程能读到这个修改的值
volatile的定义与实现原理
Java语言规范第3版中对
volatile
的定义如下:
- Java编程语言允许线程访问共享变量,为了确保共享变量能被准确和一致地更新,线程应该确保通过排他锁单独获得这个变量
在了解
volatile
实现原理之前,我们来看下与其实现原理相关的CPU术语与说明
术语 | 英文单词 | 术语描述 |
---|---|---|
内存屏障 | memory barriers | 是一组处理器指令,用于实现对内存操作的顺序限制 |
缓冲行 | cache line | CPU高速缓存中可以分配的最小存储单位。处理器填写缓存行时会加载整个缓存行,现代CPU需要执行几把次CPU指令 |
原子操作 | atomic operations | 不可中断的一个或一系列操作 |
缓存行填充 | cache line fill | 当处理器识别到从内存中读取操作数是可缓存的,处理器读取整个高速缓存行到适当的缓存(L1,L2,L3的或所有) |
缓存命中 | cache hit | 如果进行高速缓存行填充操作的内存位置仍然是下次处理器访问的地址时,处理器从缓存行中读取操作数,而不是从内存读取 |
写命中 | write hit | 当处理器将错作数写回到一个内存缓存的区域时,它首先会检查这个缓存的内存地址是否在缓存行中,如果存在一个有效的缓存行,则处理器将这个操作数写回到缓存,而不是写回到内存 |
写缺失 | write misses the cache | 一个有效的缓存行被写入到不存在的内存区域 |
volatile是如何保证可见性的?
有
volatile
变量修饰的共享变量进行写操作的时候,会通过Lock指令来保证可见性
- 而Lock指令在多核处理器会引发两件事
- Lock前缀指令会引起处理器缓存行的数据回写到系统内存
- 这个回写内存的操作会使在其他处理器的缓存无效
Lock前缀指令会引起处理器缓存行的数据回写到系统内存:
Lock前缀指令导致在执行指令期间,声言处理器的
LOCK#
信号在多处理环境中,
LOCK#
信号确保在声言该信号期间
- 处理器可以独占任何共享内存
- 因为它会锁住总线,导致其他CPU不能访问总线,不能访问总线就意味着不能访问系统内存
但是,在最近的处理器里,
LOCK#
信号一般不锁总线,而是锁缓存,毕竟总线开销的比较大这个回写内存的操作会使在其他处理器的缓存无效:
IA-32处理器和Intel64处理器使用MESI(修改、独占、共享、无效
- 控制协议去维护内部缓存和其他处理器缓存的一致性
在多核处理器系统中进行操作的时候,IA-32处理器和Intel64处理器能嗅探其他处理器访问系统内存和它们的内部缓存
- 处理器使用嗅探技术保证他的内存缓存、系统内存和其他处理器的缓存的数据在总线上保持一致
volatile的使用优化
著名的Java并发编程大师Doug lea 在JDK7的并发包新增一个队列集合类Linked-TransferQueue
- 它在使用
volatile
变量时,用一种追加字节的方式来优化队列出队和入队的性能但是存在两种场景不应该使用这种方式:
- 缓存行非64字节宽的处理器
- 共享变量不会被频繁地写
Synchronized的实现原理与应用
Synchronized很多人称呼它为重量级锁。随着Java SE1.6对
Synchronized
进行各种优化之后,已经得到很大改善Synchronized实现同步的基础:Java中的每一个对象都可以作为锁,具体表现以下3种形式:
对于普通同步方法,锁是当前实例对象
对于静态同步方法,锁是当前类的Class对象
对于同步方法块,锁是
Synchronized
括号里配置的对象
锁到底存在哪里,锁里又会存储什么信息?
从JVM规范中可以看到
Synchronized
在JVM里的实现原理
- JVM基于进入和退出Monitor对象来实现方法同步和代码块同步,但两者实现细节不一样
代码块同步是使用
monitorenter
和monitorexit
指令实现的
- 而方法同步是使用另外一种方式实现,细节在JVM规范里并没有详细说明
但是,方法的同步同样可以使用这两个指令来实现
monitorenter指令是在编译后插入到同步代码块的开始位置
- 而monitorexit是插入到方法结束和异常处,JVM要保证每个
monitorenter
必须有对应的monitorexit与之配对任何对象都有一个monitor与之关联
- 当且一个monitor被持有后,它将处于锁定状态,线程执行到monitorexit指令时
- 将会尝试获取对象锁对应的monitor的所有权,即尝试获得对象的锁
Java对象头
Synchronized
用的锁是存在Java对象头里的
- 如果对象是数组类型,则虚拟机用3个字宽(word)存储对象头,如果对象是非数组类型
- 则用2个字宽存储对象头。在32位虚拟机中,1字宽等于4字节,即32bit
锁的升级与对比
Java SE 1.6为了减少获得锁和释放锁带来的性能消耗
- 引入了偏向锁和轻量级锁,在1.6中,锁一共有4中状态,级别从底到高依次是:
- 无锁状态、偏向锁状态、轻量级锁状态和重量级锁状态。
它们之间随着竞争逐渐升级但不能降级,目的是为了提高获得锁和释放锁的效率
偏向锁
HotSpot
的作者经过研究发现,大多数情况下,锁不仅不存在多线程竞争
- 而且总是由同一线程多次获得,为了让线程获得锁的代价更低而引入了偏向锁
当一个线程访问同步块并获取琐时,会在对象头和栈帧中的锁记录里存储锁偏向的线程ID
- 以后该线程在进入和退出同步块时不需要进行CAS操作来加锁和解锁
- 只需简单地测试一下对象头的
Mark Word
里是否存储着指向当前线程的偏向锁如果测试成功,表示线程已经获得了锁
如果测试失败,则需要再测试一下
Mark Word
中偏向锁的标识是否设置成1(表示当前是偏向锁):
- 如果没有设置,则使用CAS竞争锁
- 如果设置了,则尝试使用CAS将对象头的偏向锁指向当前线程
偏向锁的撤销
偏向锁使用了一种等到竞争出现才释放锁的机制
- 所以当其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁
偏向锁的撤销,需要等待全局安全点(在这个时间点上没有正在执行的字节码)
它会首先暂停拥有偏向锁的线程,然后检查持有偏向锁的线程是否活着,不处于活动状态,则将对象头设置成无所状态
反之,拥有偏向锁的栈会被执行,遍历偏向对象的锁记录
- 栈中的锁记录和对象头
Mark Word
要么重新偏向于其他线程- 要么恢复到无锁或者标记对象不适合作为偏向锁,最后唤醒暂停的线程
下图演示了偏向锁初始化的流程,线程2演示了偏向锁撤销的流程:
关闭偏向锁
偏向锁在Java 6和Java 7里是默认启用的,但是它在应用程序启动几秒钟之后才激活
如有必要可是使用JVM参数来关闭延迟:
-XX:BiasedLockingStartupDelay = 0
如果你确定应用程序里所有的锁通常情况下处于竞争状态,可以通过JVM参数关闭偏向锁:
-XX:-UseBiasedLocking = false
轻量级锁
轻量级锁加锁:
线程在执行同步块之前,JVM会现在当前线程的栈帧中创建用于存储锁记录的空间
- 并将对象头中的
Mark Word
复制到锁记录中,官方称Displaced Mark Word
然后线程尝试使用CAS将对象头中的
Mark Word
替换为指向锁记录的指针
- 如果成功,当前线程获得锁,如果失败,表示其他线程竞争锁,当前线程便尝试使用自旋锁来获取琐
轻量级锁解锁:
轻量级锁解锁时,会使用原子的CAS操作将
Displaced Mark Word
替换回到对象头,如果成功,则表示没有竞争发送
- 如果失败,表示当前锁存在竞争,锁就会膨胀成重量级锁
下图是两个线程同时争夺锁,导致锁膨胀的流程图
因为自旋会消耗
CPU
,为了避免无用的自旋(比如获得锁的线程被阻塞住了)
- 一旦锁升级成重量级锁,就不会再恢复到轻量级锁状态
当锁处于这个状态下,其他线程试图获取琐时,都会被组塞住
- 当持有锁的线程释放锁之后会唤醒这些线程,被唤醒的线程就会进行新一轮的夺锁之争
锁的优缺点对比
锁 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
偏向锁 | 加锁和解锁不需要额外的消耗,和执行非同步方法相比仅存在纳秒级差距 | 如果线程间存在锁竞争,会带来额外的锁撤销的消耗 | 适用于只有一个线程访问同步块场景 |
轻量级锁 | 竞争的线程不会阻塞,提高了程序的响应速度 | 如果始终得不到锁竞争的线程,使用自旋会消耗CPU | 追求响应时间同步块执行速度非常快 |
重量级锁 | 线程竞争不适用自旋,不会消耗CPU | 线程阻塞,响应时间缓慢 | 追求吞吐量同步块实行速度较长 |
原子操作的实现原理
原子(atomic)本意是不能被进一步分割的最小粒子
- 而原子操作(
atomic operation
)意为不可被中断的一个或一系列操作
术语定义
处理器如何实现原子操作
32位IA-32处理器使用基于对缓存加锁或总线加锁的方式来实现多处理器之间的原子操作
首先处理器会自动保证基本的内存操作的原子性
- 处理器保证从系统内存中读取或者写入一个字节是原子的
- 意思是当一个处理器读取一个字节时,其他处理器不能访问这个字节的内存地址
Pentium6和最新的处理器能自动保证单处理器对同一个缓存行里进行16/32/64位的操作时原子的
- 但是复杂的内存操作处理器是不能自动保证其原子性的
- 比如跨总线宽度、跨多个缓存行和跨页表的访问
但是,处理器提供总线锁定和缓存锁定两个机制来保证复杂内存操作的原子性
使用总线锁保证原子性
提供一个
LOCK#
信号,当一个处理器在总线上输出此信号时
- 其他处理器的请求将被阻塞住,那么处理器可以独占共享内存
使用缓存锁保证原子性
内存区域如果被缓存在处理器的缓存行中,并且在Lock操作期间被锁定
- 那么当它执行锁操作回写到内存时,处理器不在总线上声言
LOCK#
信号
- 而是修改内部的内存地址,并允许它的缓存一致性机制来保证操作的原子性
但是有两种情况下处理器不会使用缓存锁定:
- 当操作的数据不能被缓存在处理器内部,或操作的数据跨多个缓存行(
cash line
)时,则处理器会调用总线锁定- 有些处理器不支持缓存锁定
Java如何实现原子操作
java中可以通过锁和循环CAS的方式来实现原子操作
使用锁机制实现原子操作:
锁机制保证了只有获得锁的线程才能够操作锁定的内存区域
JVM内部实现了很多种锁机制,有偏向锁、轻量级锁和互斥锁
- 除了偏向锁,JVM实现锁的方式都用了循环CAS
- 即当一个线程想进入同步块的时候使用循环CAS的方式获取锁,当它退出同步块的时候使用循环
CAS
释放锁
使用CAS实现原子操作:
JVM中的CAS操作正是利用处理器提供的
CMPXCHG
指令实现的自旋CAS实现的基本思路就是循环进行CAS操作直到成功为止
- 以下代码实现了一个基于CAS线程安全的计数器方法safeCount和一个非线程安全的计数器count
public class Counter {
private AtomicInteger atomicInteger = new AtomicInteger(0);
private int i = 0;
public static void main(String[] args) {
final Counter cas = new Counter();
List<Thread> ts = new ArrayList<>(600);
long start = System.currentTimeMillis();
for (int j = 0; j < 100; j++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
cas.count();
cas.safeCount();
}
}
});
ts.add(t);
}
for (Thread t : ts){
t.start();
}
//等待所有线程执行完成
for (Thread t : ts){
try{
t.join();
} catch (InterruptedException e){
e.printStackTrace();
}
}
System.out.println(cas.i);
System.out.println(cas.atomicInteger);
System.out.println(System.currentTimeMillis() - start);
}
//使用CAS实现线程安全计数器
private void safeCount(){
for (;;){
int i = atomicInteger.get();
boolean suc = atomicInteger.compareAndSet(i, ++i);
if (suc){
break;
}
}
}
//非线程安全技术器
private void count(){
i++;
}
}
输出:
987249
1000000
105
CAS实现原子操作的三大问题
ABA问题:
- 可以通过添加版本号,或者Java1.5开始JDKA的tomic包里提供了一个类AtomicStampedReference
- 这个类的compareAndSet方法就是检查当前引用和标志是否等于预期引用和标志
循环时间长开销大:
- JVM如果支持
pause
指令,效率有一定提升只能保证一个共享变量的原子操作:
- 用锁或者多个共享变量合并成一个
- JDK提供了
AtomicReference
类来保证引用对象之间的原子性,可以把多个变量放在一个对象里来进行CAS操作
内存模型上
Java内存模型的基础
并发编程模型的两个关键问题:
在并发编程中,需要处理两个关键问题:
线程之间如何通信:
- 指线程之间以何种机制来交换信息。在命令式编程中,线程之间的通信机制有两种(共享内存和消息传递)
- 在共享内存的并发模型里,线程之间共享程序的公共状态,通过写-读内存中的公共状态进行隐式通信
- 在消息传递的并发模型里,线程之间没有公共状态,线程之间必须通过发送消息来显式进行通信
线程之间如何同步:指程序中用于控制不同线程间操作发生相对顺序的机制
- 在共享内存并发模型里,同步是显式进行的,程序员必须显式指定某个方法或某段代码需要在线程之间互斥执行
- 在消息传递的并发模型里,由于消息的发送必须在消息的接受之前,因此同步是隐式进行的
Java的并发采用的是共享内存模型,
Java
线程之间的通信总是隐式进行,整个通信过程对程序员完全透明
Java内存模型的抽象结构
在Java中,所有实例域、静态域和数组元素都存储在堆内存中,堆内存在线程之间共享
局部变量、方法定义参数和异常处理器参数不会再线程之间共享,它们不会又内存可见性问题,也不会收内存模型的影响
线程之间的共享变量存储在主内存中,每个线程都有一个私有的本地内存,本地内存中存储了该线程以读/写共享变量的副本
本地内存是JMM(Java内存模型)的一个抽象概念,并不真实存在
- 它涵盖了缓存、写缓冲区、寄存器以及其他的硬件和编译器优化
从上图来看,线程A与线程B之间要通信的话,必须要经历下面2个步骤:
- 线程A把本地内存A中更新过的共享变量刷新到主内存中去
- 线程B到主内存中去读取线程A之前已更新过的共享变量
从整体来看,这两个步骤实质上是线程A在向线程B发送消息,而且这个通信过程必须要经过主内存
JMM
通过控制主内存与每个线程的本地内存之间的交互,来为Java程序员提供内存可见性保证
从源代码到指令序列的重排序
为了提高性能,编译器和处理器常常会对指令做重排序,重排序分3中类型
编译器优化的重排序:
- 编译器在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序
指令级并行的重排序:
- 现代处理器采用了指令级并行技术(
Instruction-Level Parallelism ,ILP
)来将多条指令重叠执行- 如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序
内存系统的重排序:
- 由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上去可能是在乱序执行
从Java源代码到最终实际执行的指令序列,会分别经历下面3种重排序
- 源代码 –> 1:编译器优化重排序 –>:2:指令级并行重排序 –> 3:内存系统重排序 –> 最终执行的指令序列
1属于编译器重排序,2和3属于处理器重排序
对于编译器,JMM的编译器重排序规则会禁止特定类型的编译器重排序。
对于处理器重排序,JMM的处理器重排序规则会要求Java编译器在生成指令序列时
- 插入特定类型的内存屏障指令,通过内存屏障指令来禁止特定类型的处理器重排序
为了保证内存可见性,Java编译器在生成指令序列的适当位置会插入内存屏障指令来禁止特定类型的处理器重排序
JMM
把内存屏障指令分为4类,如下图:
Happens-before简介
从JDK 5开始,Java使用新的JSR-133内存模型(除非特别说明,本文针对的都是该模型)
- JSR-133使用
happens-before
的概念来阐述操作之间的内存可见性在JMM中,如果一个操作执行的结果需要对另一个操作可见,那么这两个操作之间必须要存在
happens-before
关系
- 这里提到的两个操作既可以是一个线程之内,也可以是不同线程之间
happens-before规则如下:
程序顺序规则:一个线程中的每个操作,
happens-before
于该线程中的任意后续操作监视器锁规则:
- 对一个锁的解锁,
happens-before
于随后对这个锁的加锁volatile变量规则:
- 对一个volatile域的写,
happens-before
于任意后续对这个volatile域的读传递性:
- 如果A happens-before B,且B happens-before C,那么A happens-before C
注意:两个操作之间具有happens-before关系,并不意味着前一个操作必须要在后一个操作之前执行!
happens-before
仅仅要求前一个操作(执行的结果)对后一个操作可见,切前一个操作按顺序排在第二个操作之前
happens-before与JMM的关系如图:
如上图,一个happens-before规则对应于一个或多个编译器和处理器重排序规则
对于Java程序员来说,
happens-before
规则简单易懂
- 它避免Java程序员为了理解JMM提供的内存可见性保证而去学习复杂的重排序规则以及这些贵的具体实现方法
重排序
重排序是指编译器和处理器为了优化程序性能而对指令序列进行重新排序的一种手段
数据依赖性:
如果两个操作访问同一个变量,且这两个操作有一个为写操作,此时这两个操作之间就存在数据依赖性
数据依赖性分为3种类型
名称 | 代码示例 | 说明 |
---|---|---|
写后读 | a = 1;b = a; | 写一个变量之后,再读这个位置 |
写后写 | a = 1;a = 2; | 写一个变量之后,再写这个变量 |
读后写 | a = b;b = 1; | 读一个变量之后,再写这个变量 |
上面3中情况,只要重排序两个操作的执行顺序,程序的执行结果就会被改变
- 前面提到过,编译器和处理器可能会对操作作重排序,编译器和处理器在重排序时
- 会遵守数据依赖性,编译器和处理器不会改变存在数据依赖关系的两个操作执行顺序
这里所说的数据依赖性仅针对单个处理器中执行的指令序列和的那个线程中执行的操作
- 不同处理器之间和不同线程之间的数据依赖性不被编译器和处理器考虑
as-if-serial语义:
as-if-serial语义的意思是:不管怎么重排序,(单线程)程序的执行结果不能被改变
- 编译器和处理器和runtime都必须遵守
as-if-serial
语义
double pi = 3.14; //A
double r = 1.0; //B
double area = pi * r * r //C
如上A和C,B和C都存在数据依赖关系,因此C不能排在A或者B之前
- 但是A和B之间没有数据依赖关系,编译器和处理器是可以重排序A与B之间执行顺序的
volatile的内存语义
volatile变量自身具有下列特性:
- 可见性:
- 对一个
volatile
变量的读,总是能看到(任意线程)对这个volatile变量最后的写入- 原子性:
- 对任意单个
volatile
变量的读/写具有原子性,但类似于volatile++这种复合操作不具有原子性
volatile写-读建立的happens-before关系
从JSR-133(JDK 1.5)开始,volatile变量的写-读可以实现线程之间的通信
从内存语义的角度来说,volatile的写-读与锁的释放-获取有相同的内存效果:
volatile
写和锁的释放有相同的内存定义volatile的读与锁的获取有相同的内存语义
下面使用volatile变量的示例代码:
public class VolatileExample {
int a = 0;
volatile boolean flag = false;
public void writer(){
a = 1; //1
flag = ture; //2
}
public void reader(){
if(flag){ //3
int i = a; //4
...
}
}
}
假设线程A执行writer()方法之后,线程B执行reader()方法
根据happens-before规则,这个过程建立的
happens-before
关系可以分为3类:
根据程序次序规则,1 happens-before 2, 3 happens-before 4
根据volatile规则,2 happens-before 3
根据happens-before传递性规则,1 happens-before 4
上述happens-before关系的图形化表示形式如下:
上图中,每一个箭头链接的两个节点,代表了一个
happens-before
关系黑色箭头表示程序顺序规则,橙色箭头表示volatile规则,蓝色箭头表示组合这些规则后提供的
happens-before
保证A线程写了一个volatile变量后,B线程读同一个
volatile
变量A线程在写volatile变量之前所有可见的共享变量,在B线程读同一个volatile变量后,将立即变得对B线程可见
volatile写-读的内存语义:
当写一个volatile变量时
- JMM会把该线程对应的本地内存中的共享变量值刷新到主内存
当读一个volatile变量时
- JMM会把该线程对应的本地内存置为无效。线程接下来将从主内存中读取共享变量
结合上图总结为:
线程A写一个volatile变量
- 实质上是线程A向接下将要读这个
volatile
变量的某个线程发出了(其对共享变量所做修改的)消息线程B读一个volatile变量
- 实质上是线程B接受了之前某个线程发出的(在写这个volatile变量之前对共享变量所做修改的)消息
线程A写一个volatile变量
- 随后线程B读这个volatile变量,这个过程实质上是线程A通过主内存向线程B发送消息
volatile内存语义的实现
为了实现volatile内存语义,JMM会分别限制这两个类型的重排序类型
下表是JMM针对编译器制定的volatile重排序规则表
是否能重排序 | 第二个操作 | ||
---|---|---|---|
第一个操作 | 普通的读/写 | volatile读 | volatile写 |
普通的读/写 | NO | ||
volatile读 | NO | NO | NO |
volatile写 | NO | NO |
从上表可以看出:
当第二个操作时volatile写时,不管第一个操作时什么,都不能重排序
- 这个规则确保volatile写之前的操作不会被编译器重排序到volatile写之后
当第一个操作时volatile读时,不管第二个操作是什么,都不能重排序
- 这个规则确保volatile读之后的操作不会被编译器重排序到volatile之前
当第一个操作时volatile写,第二个操作时volatile读时,不能重排序
为了实现volatile的内存语义,编译器在生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序
- 对于编译器来说,发现一个最优布置来最小化插入屏障的总数几乎不可能
- 为此,JMM采取保守策略。下面是基于保守策略的JMM内存屏障插入策略
在每个volatile写操作的前面插入一个StoreStore屏障
在每个volatile写操作的后面插入一个StoreLoad屏障
在每个volatile读操作的后面插入一个LoadLoad屏障
在每个volatile读操作的后面插入一个
LoadStore
屏障
下面是保守策略下,volatile写插入内存屏障后生产的指令序列示意图:
上述的volatile写和volatile读的内存屏障插入策略非常保守
在实际执行时,只要不改变
volatile
写-读的内存语义,编译器可以根据具体情况省略不必要的屏障
public class VolatileBarrierExample {
int a;
volatile int v1 = 1;
volatile int v2 = 2;
void readAndWrite(){
int i = v1; //第一个volatile读
int j = v2; //地二个volatile读
a = i + j; //普通写
v1 = i + 1; //第一个volatile写
v2 = j + 1; //第二个volatile写
}
... //其他方法
}
针对
readAndWrite()
方法,编译器在生成字节码可以做如下的优化
注意:最后的StoreLoad屏障不能省略。因为第一个volatile写之后,方法立即return
- 此时编译器可能无法准确断定后面是否会有volatile读或写
- 为了安全起见,编译器通常会在这里插入一个StoreLoad屏障
JSR-133为什么要增强volatile的内存语义?
在JSR-133之前的旧Java内存模型中,虽然不允许volatile变量之间重排序
- 但旧的Java内存模型允许
volatile
变量与普通变量重排序volatile的写-读没有锁的释放-获取所具有的内存语义
为了提供一种比锁更轻量级的线程之间通信的机制,JSR-133专家组决定增强volatile的内存语义
- 严格限制编译器和处理器对
volatile
变量与普通变量的重排序
- 确保volatile的写-读和锁的释放-获取具有相同的内存语义
内存模型下
锁的内存语义
锁可以让临界区互斥执行
这里将介绍锁的另一个同样重要但常常被忽视的功能:
- 锁的内存语义
锁的释放-获取建立的happens-before关系
锁是Java并发编程中最重要的同步机制
- 锁除了让临界区互斥执行外,还可以让释放锁的线程向获取同一个锁的线程发送消息
下面是锁释放-获取的示例代码:
public class MonitorExample {
int a = 0;
public synchronized void writer(){ //1
a++; //2
} //3
public synchronized void reader(){ //4
int i = a; //5
...
} //6
}
假设线程A执行writer()方法,随后线程B执行reader()方法
根据happens-before规则,这个过程包含的
happens-before
关系可以分为3类
根据程序次序规则,1 happens-before 2,2 happens-before 3,4 happens-before 5,5 happens-before 6
根据监视器锁规则,3 happens-before 4
根据happens-before的传递性,2 happens-before 5
上述
happens-before
关系图形化表现形式如下图所示
上图每一个箭头链接的两个节点,代表一个
happens-before
关系黑色箭头表示程序顺序规则
橙色箭头表示监视器锁规则
蓝色箭头表示组合这些规则后提供的happens-before保证
在线程A释放锁之后,随后线程B获取同一个锁,2 happens-before 5
- 因此线程A在释放锁之前所有可见的共享变量,在线程B获取同一个锁之后,将立刻变得对B线程可见
锁的释放-获取的内存语义
当线程释放锁时,
JMM
会把该线程对应的本地内存中的共享变量刷新到主内存中当线程获取琐时,JMM会把该线程对应的本地内存置为无效
从而使得被监视器保护的临界区代码必须从主内存中读取共享变量
对比锁释放-获取的内存语义与
volatile
写-读的内存语义可以看出:
- 锁释放与volatile写有相同的内存语义,锁获取与volatile读有相同的内存语义
下面对锁释放-获取内存语义做个总结:
- 线程A释放一个锁
- 实质上是线程A向接下来将要获取这个锁的某个线程发出出了(线程A对共享变量所做修改的)消息
- 线程B获取一个锁
- 实质上是线程B接受了之前某个线程发出的(在释放这个锁之前对共享变量所做修改的)消息
- 线程A释放锁,随后线程B获取这个锁,这个过程实质上是线程A通过住内存向线程B发送消息
锁内存语义的实现
本文将借助
ReentrantLock
的源代码,来分析锁内存语义的具体实现机制在ReentrantLock中,调用
lock()
方法获取锁,调用unlock()
方法释放锁ReentrantLock的实现依赖于Java同步框架
AbstactQueuedSynchronizer
(本文简称之AQS)AQS使用一个整型的 volatile变量(命名为state)为维护同步状态
- 这个volatile变量是
ReentrantLock
内存语义实现的关键
ReentrantLock分为公平锁和非公平锁,我们首先分析公平锁
使用公平锁时,加锁方法lock()调用的轨迹如下:
ReentrantLock:lock()
FairSync:lock()
AbstactQueuedSynchronizer: acquire(int arg)
ReentrantLock:tryAcquire(int acquries)
在第4步真正开始加锁,下面是该方法的源代码
protected final boolean tryAcquire(int acquires){
final Thread current = Thread.currentThread();
int c = getState(); //获取锁的开始,首先读volatile变量的state
if(c ==0){
if(isFirst(current) && copareAndSetState(0,acquires)){
setExclusiveOwnerThread(current);
return true;
}
}else if (current == getExclusiveOwnerThread()){
int nextc = c + acquires;
if(nextc < 0){
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
从上面的源代码我们可以看出,加锁方法是先读
volatile
变量state使用公平锁时,解锁方法
unlock()
调用轨迹如下:
ReentrantLock:unlock()
AbstactQueuedSynchronizer: release(int arg)
Sync:tryRelease(int release)
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c); //释放锁的最后,写volatile变量state
return free;
}
从上面的源代码我们可以看出,在释放锁的最后写
volatile
变量state
- 公平锁在释放锁的最后写volatile变量state
- 在获取锁时首先读这个volatile变量
根据volatile的
happens-before
规则
- 释放锁的线程在写volatile变量之前可见的共享变量
- 在获取锁的线程读取同一个volatile变量后将立即变得对获取锁的线程可见
现在来分析非公平锁的内存语义的实现
- 非公平锁的释放和公平锁完全一样,所以这里仅仅分析非公平锁的获取
使用非公平锁时,加锁方法
lock()
,加锁方法lock()调用轨迹如下:
ReentrantLock:lock()
NonfairSync: lock()
AbstactQueuedSynchronizer:tryRelease(int release)
在第3步开始加锁,下面是该方法的源代码
protected final boolean compareAndSetState(int ,expect, int update){
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
该方法以原子操作的方式更新state变量
- 本来把Java的
compareAndSet()
方法调用简称为CASJDK文档对该方法的说明如下:
- 如果当前状态值等于预期值,则以原子方式将同步状态设置为给定的更新值
此操作具有
volatile
读和写的内存语义。现在对公平锁和非公平锁的内存语义做个总结:
- 公平锁和非公平锁释放时,最后都要写一个
colatile
变量state- 公平锁获取时,首先会去读volatile变量
- 非公平锁获取时,首先会用CAS更新volatile变量,这个操作同时具有volatile读和volatile写的内存语义
本文对ReentrantLock的分析可以看出,锁释放-获取的内存语义的实现至少有下面两种方式:
- 利用
volatile
变量的写-读所具有的内存语义- 利用CAS所附带的volatile读和volatile写的内存语义
concurrent包的实现
由于Java的CAS同时具有volatile读和volatile写的内存语义,因此Java线程之间的通信有下面4种方式
A线程写volatile变量,随后B线程读这个volatile变量
A线程写volatile变量,随后B线程用CAS更新这个volatile变量
A线程用CAS更新一个
volatile
变量,随后B线程用CAS更新这个volatile变量A线程用CAS更新一个volatile变量,随后B线程读这个volatile变量
Java的CAS会使用现代处理器上提供的高效机器级别的原子指令
- 这些原子指令以原子方式对内存执行读-改-写操作,这是在多处理器中实现同步的关键
同时,volatile变量的读/写和CAS可以实现线程之间的通信
把这些特性整合在一起,就形成了整个
concurrent
包得以实现的基石如果我们仔细分析concurrent包的源代码实现,会发现一个通用化的实现模式:
首先,声明共享变量为volatile
然后,使用CAS的原子条件更新来实现线程之间的同步
同时,配合以
volatile
的读/写和cas所具有的volatile读和写的内存语义来实现线程之间的通信以下是concurrent包的实现示意图
final域的内存语义
前面介绍锁和volatile相比,对final域的读和写更像是普通的变量访问
下面介绍final域的内存语义
final域的重排序规则
对于final域,编译器和处理器要遵守两个重排序规则
在构造函数内对一个final域的写入
- 与随后把这个被构造对象的引用赋值给一个引用变量,这两个操作之间不能重排序
初次读一个包含final域的对象的引用,与随后初次读这个final域,这两个操作之间不能重排序
写final域的重排序规则
写final域的重排序规则禁止把final域的写重排序到构造函数之外
这个规则的实现包含下面2个方面
JMM禁止编译器把final域的写重排序到构造函数之外
编译器会在final域的写之后,构造函数return之前,插入一个
StoreStore
屏障这个屏障禁止处理器把final域的写重排序到构造函数之外
现在让我们粉笔writer()方法
writer()
方法只包含一行代码:
finalExample = new FinalExample()
- `这行代码先构造一个FinalExample类型的对象,然后把这个对象引用赋值给引用变量的obj
假设线程B读对象引用与读对象的成员域之间没有重排序,下图是一种可能的执行时序
图中,写普通域的操作被编译器重排序到了构造函数之外
- 读线程B错误地读取了普通变量I初始化之前的值
而写final域的操作,被写final域的重排序规则限定在了构造函数之内
- 读线程B正确地读取了final变量初始化之后的值
写final域的重排序规则可以确保:
- 在对象引用为任意线程可见之前,对象的final域已经被正确初始化过了,而普通域不具有这个保障
读final域的重排序规则
读final域的重排序规则是,在一个线程中,初次读对象引用与初次读该对象包含的final域
- JMM禁止处理器重排序这两个操作(注意。这个规则仅仅针对处理器)
编译器会在读final域操作的前面拆入一个
LoadLoad
屏障初次读对象引用与初次读该对象包含的final域,这两个操作之间存在间接依赖关系
由于编译器遵守间接依赖关系,因此编译器不会重排序这两个操作
- 大多数处理器也会遵守间接依赖,也不会重排序这两个操作
- 但有少数处理器允许对存在间接依赖关系的操作做重排序(比如alpha处理器)
- 这个规则就是专门用来针对这种处理器的
reader()方法包含3个操作
初次读引用变量obj
初次读引用变量obj指向对象的普通域 j
初次读引用变量obj指向对象的final域 i
现在假设写线程A没有发生任何重排序,同事程序在不遵守哦间接依赖的处理器上执行,下图所示是一种可能的执行时序
上图中,读对象的普通域的操作被处理器重排序到读对象引用之前
读普通域时,该域还没有被写线程A写入,这是一个错误的读取操作
而读final域的重排序规则会把读对象final域的操作限定在读对象引用之后
- 此时该final域已经被A线程初始化过了,这是一个正确的读取操作
读final域的重排序规则可以确保:
- 在读一个对象的final域之前,一定会先读包含这个
final
域的对象的引用在这个示例程序中,如果该引用不为null,那么引用对象的final域一定已经被A线程初始化过了
上面看到是final域是基础数据类型,final域为引用类型将会有什么效果?
在构造函数内对一个final引用的对象的成员域的写入
- 与随后在构造函数外把这个被构造对象的引用赋值给一个引用变量,这两个操作之间不能重排序
JSR-133为什么要增强final的语义
在旧的Java内存模型中,一个最严重的缺陷就是线程可能看到final域的值会改变
比如,一个线程当前看到一个整型final域值为0(还未初始化之前的默认值)
- 过一段时间之后这个线程再去读这个
final
域的值时,却发现值变为1(被某个线程初始化之后的值)最常见的例子就是在旧的Java内存模型中,String的值可能会改变
- 为了修补这个漏洞,JSR-133专家组增加了
final
的语义通过为final域增加写和读重排序规则,可以为Java程序员提供初始化安全保证:
- 只要对象是正确构造的(被构造对象的引用在构造函数中没有逸出)
- 那么不需要使用同步(指lock和
volatile
的使用)就可以保证任意线程都能看到这个final域在构造函数中被初始化之后的值
happens-before:
JSR-133使用happens-before的概念来指定两个操作之间的执行顺序
- 由于这两个操作可以在一个线程之内,也可以是在不同线程之间
因此,JMM可以通过happens-before关系向程序员提供跨线程的内存可见性保证
- 如果A线程的写操作A与B线程的读操作b之间存在
happens-before
关系,尽管a操作和b操作在不同的线程中执行
- 但JMM向程序员保证a操作将对b操作可见
happens-before定义如下:
如果一个操作happens-before另一个操作
- 那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前
两个操作之间存在happens-before关系,并不意味着Java平台的具体实现必须要按照happens-before关系指定的顺序来执行
- 如果重排序之后的执行结果,与按
happens-before
关系来执行的结果一直
- 那么这种重排序并不非法(也就是说,JMM允许这种重排序)
as-if-seria语义保证单线程内程序的执行结果不被改变
happens-before
关系保证正确同步的多线程程序的执行结果不被改变as-id-serial语义给编写单线程程序的程序员创造了一个幻境:
- 单线程程序是按照程序的顺序来执行的
happens-before关系给编写正确同步的多线程程序的程序员创造了一个幻境:
- 正确同步的多线程程序是按照
happens-before
指定的顺序来执行的
happens-before规则
程序顺序规则:
- 一个线程中的每个操作,happens-before与该线程中的任意后续操作
监视器锁规则:
- 对一个锁的解锁,happens-before与随后对这个锁的加锁
volatile变量规则:
- 对一个
volatile
域的写,happens-before于任意后续对这个volatile域的读传递性:
- 如果A happens-before B,且 B happens-before C,那么A happens-before C
start()规则:
- 如果线程A执行操作
ThreadB.start()
,那么A线程的ThreadB.start()
操作happens-before于线程B中的任意操作join()规则:
- 如果线程A执行操作
ThreadB.join()
并成功返回
- 那么线程B中任意操作happens-before于线程A从
ThreadB.join()
操作成功返回
Java内存模型综述
前面对Java内存模型的基础知识和内存模型的具体实现进行了说明
下面对Java内存模型的相关知识做一个总结
处理器的内存模型
顺序一致性内存模型是一个理论参考模型,JMM和处理器内存模型在设计时通常会以顺序一致性内存模型为参照
在设计时,JMM和处理器内存模型会对顺序一致性模型做一些放松,因为如果完全按照顺序一致性模型来实现处理器和JMM
- 那么很多的处理器和编译器优化都要被禁止,这对执行性能将会有很大的影响
根据对不同类型的读/写操作组合的执行顺序的放松,可以分为如下几种类型:
- 放松程序中写-读操作的顺序,由此产生了Total Store Ordering内存模型(简称TSO)
- 在上面的基础上,继续放松程序中写-写操作的顺序,由此产生了Partial Store Order内存模型(简称PSO)
- 在前面两条的基础上,继续放松程序中读-写和读-读操作的顺序
- 由此产生了Relaxed Memory Order内存模型(简称RMO)和PowerPC内存模型
注意,这里处理器对读/写操作的放松
- 是以两个操作之间不存在数据依赖性为前提的
- 因为处理器要遵守as-if-serial语义,处理器不会对存在数据依赖性的两个内存操作做重排序
下表展示了常见处理器内存模型的细节特征:
内存模型名称 | 对应的处理器 | Store-Load重排序 | Store-Store重排序 | Load-Load和Load-Store重排序 | 可以更早读取到其他处理器的写 | 可以更早读取到当前处理器的写 |
---|---|---|---|---|---|---|
TSO | sparc-TSO X64 | Y | Y | |||
PSO | sparc-PSO | Y | Y | Y | ||
RMO | ia64 | Y | Y | Y | Y | |
PowerPC | PowerPC | Y | Y | Y | Y | Y |
各种内存模型之间的关系:
JMM是一个语言及的内存模型,处理器内存模型是硬件级的内存模型,顺序一致性内存模型是一个理论参考模型
下面是语言内存模型、处理器内存模型和顺序一致性内存模型强弱对比示意图
从图中可以看出:常见的4中处理器内存模型比常用的3种语言内存模型要弱
- 处理器内存模型和语言内存模型都比顺序一致性内存模型要弱
同处理器内存模型一样,越是追求执行性能的语言,内存模型设计得会越弱
并发编程基础
线程简介
什么是线程?
现代操作系统在一个运行程序时,会为其创建一个进程
- 例如,启动一个Java程序,操作系统就会创建一个Java进程
现代操作系统调度的最小单元是线程,也叫轻量进程(
Light Weight Process
)
- 在一个进程里可以创建多个线程,这些线程都拥有各自的计数器、堆栈和局部变量等属性,并且能够访问共享的内存变量
处理器在这些线程上高速切换,让使用者感觉到这些线程在同时执行
为什么要使用多线程?
更多的处理器核心:
- 线程是大多数操作系统调度的基本单元,一个程序作为一个进程来运行,程序运行过程中能够创建多个线程
- 而一个线程在一个时刻只能运行在一个处理器核心上
更快的响应时间:
- 将数据一致性不强的操作派发给其他线程处理(也可以使用消息队列),如生成订单快照、发送邮件等
- 这样做的好处是响应用户请求的线程能够尽可能快地处理完成
- 缩短了响应时间,提升了用户体验
更好的编程模型:
- Java为多线程编程提供了良好、考究并且一致的编程模型,使开发人员能够更加专注于问题的解决
- 即为所遇到的问题建立适合的模型,而不是绞尽脑汁地考虑如何将其多线程化
线程优先级
现代操作系统基本采用时分的形式调度运行的线程,操作系统会分出一个个时间片
- 线程会分配到若干时间片,当线程的时间片用完了就会发生线程调度,并等待着下次分配
在Java线程中,通过一个整型成员变量priority来控制优先级,范围是1~10
- 在线程构建的时候可以通过
setPriority(int)
方法来修改优先级,默认优先级是5注意:线程优先级不能作为程序正确性的依赖
- 因为操作系统可以完全不用理会Java线程对于优先级的设定
线程的状态
Java线程在运行的生命周期中可能处于6种不同的状态
- 在给定的一个时刻,线程只能处于其中的一个状态
状态名称 | 说明 |
---|---|
NEW | 初始状态,线程被构建,但是还没有调用start()方法 |
RUNNABLE | 运行状态,Java线程将操作系统中的就绪和运行两种状态笼统地称作“运行中” |
BLOCKED | 阻塞状态,表示线程阻塞于锁 |
WAITING | 等待状态,表示线程进入等待状态,进入该状态表示当前线程需要等待其他线程做出一些特定动作(通知或中断) |
TIME_WAITING | 超时等待状态,该状态不同于WAITING,它是可以在指定的时间自行返回的 |
TERMINATED | 终止状态,表示当前线程已经执行完毕 |
线程在自身的生命周期中,并不是固定地处于某个状态,而是随着代码的执行在不同的状态之间进行切换,如下图
由上图看出,线程创建之后,调用
start()
方法开始运行当线程执行wait()方法之后,线程进入等待状态
进入等待状态的线程需要依靠其他线程的通知才能够返回到运行状态
而超时等待状态相当于在等待状态的基础上增加了超时限制
也就是超时时间到达时将会返回到运行状态
当线程调用同步方法时,在没有获取到锁的情况下,线程将会进入到阻塞状态
线程在执行
Runnable的run()
方法之后将会进入到终止状态
注意:Java将操作系统中的运行和就绪两个状态合并称为运行状态
阻塞状态是线程阻塞在进入synchronized关键字修饰的方法或代码块(获取锁)时的状态
- 但是阻塞在
java.concurrent
包中Lock接口的线程状态却是等待状态
- 因为
java.concurrent
包中Lock接口对于住在的实现均使用了LoackSupport类中的相关方法
Daemon线程
Daemon线程是一种支持型线程,因为它主要被用作程序中后台调度以及支持型工作
- 这意味着,当一个Java虚拟机中不存在非Daemon线程的时候,Java虚拟机将会退出
可以通过调用
Thread.setDaemon(true)
将线程设置为Daemon线程
- Daemon属性需要在启动线程之前设置,不能再启动线程之后设置
Daemon线程被用作完成支持性工作
- 但是在Java虚拟机退出时Daemon线程中的
finally
块并不一定会执行,如下代码
public class Daemon {
public static void main(String[] args) {
Thread thread = new Thread(new DaemonRunner(), "DaemonRunner");
thread.setDaemon(true);
thread.start();
}
static class DaemonRunner implements Runnable{
@Override
public void run(){
try {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}finally {
System.out.println("DaemonThread finally run");
}
}
}
}
运行Daemon程序,可以看到没有任何输出
main线程(非Daemon线程)在启动了线程DaemonRunner之后随着main方法执行完毕而终止
- 而此时Java虚拟机中已经没有非Daemon线程,虚拟机需要退出
Java虚拟机中的所有Daemon线程都需要立即终止,因此DaemonRunner终止
- 但是
DaemonRunner
中的finally块并没有执行注意:在构建Daemon线程时,不能依靠finally块中的内容来确保执行关闭或清理资源的逻辑
启动和终止线程
启动线程:
线程对象在初始化完成之后,调用
start()
方法就可以启动这个线程线程start()方法的含义是:
- 当前线程(即parent线程)同步告知Java虚拟机
- 只要线程规划器空闲,应立即启动调用start()方法的线程。
启动一个线程前,最好为这个线程设置设置线程名称,因为这样在使用jstack分析程序或者进行问题排查时
- 就会给开发人员提供一些提示,自定义的线程最好能够起个名字
理解中断:
中断可以理解为线程的一个标识位属性,它表示一个运行中的线程是否被其他线程进行了中断操作
线程通过检查自身是否被中断来进行响应,线程通过方法
isInterrupted()
来进行判断是否被中断
- 也可以调用静态方法
Thread.interrupted()
对当前线程的中断标识位进行复位。如果该线程已经处于终结状态,即时该线程被中断过
在调用该线程对象的isInterrupted()时依旧会返回false.从Java的API中可以看到
许多声明抛出InterruptedException的方法(例如
Thread.sleep(long millis)
方法)
- 这些方法在抛出InterruptedException之前
- Java虚拟机会先将该线程的中断标识位清除,然后抛出InterruptedException
- 此时调用
isIntereupted()
方法将返回false
public class Interrupted {
public static void main(String[] args) throws Exception {
//sleepThread不停的尝试睡眠
Thread sleepThread = new Thread(new SleepRunner(), "SleepRunner");
sleepThread.setDaemon(true);
//busyThread不停的运行
Thread busyThread = new Thread(new BusyRunner(), "BusyRunner");
busyThread.setDaemon(true);
sleepThread.start();
busyThread.start();
//休眠5秒,让sleepThread和busyThread充分运行
TimeUnit.SECONDS.sleep(5);
sleepThread.interrupt();
busyThread.interrupt();
System.out.println("sleepThread.isInterrupted is " + sleepThread.isInterrupted());
System.out.println("busyThread.isInterrupted is " + busyThread.isInterrupted());
//防止sleepThread和busyThread立即退出
TimeUnit.SECONDS.sleep(2);
}
static class SleepRunner implements Runnable{
@Override
public void run(){
while (true){
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class BusyRunner implements Runnable{
@Override
public void run(){
while (true){
}
}
}
}
输出:
sleepThread.isInterrupted is false
busyThread.isInterrupted is true
从结果看出,抛出InterruptedException的线程SleepThread,其中断标识位被清除了
- 而一直忙碌运作的线程没有被清除
安全地终止线程
之前提到的中断状态是线程的一个标识位,而中断操作是一种简便的线程间交互方式
- 而这种交互方式最适合用来取消或停止任务
除了中断以外,还可以利用一个booleam变量来控制是否需要停止任务并终止该线程
在下面例子中,创建了一个线程CountThread,它不断地进行变量累加,而主线程尝试对其进行中断操作和停止操作
public class Shutdown {
public static void main(String[] args) throws Exception {
Runner one = new Runner();
Thread countThread = new Thread(one, "countThread");
countThread.start();
//睡眠1秒,main线程对countThread进行中断,使countThread能够感知中断而结束
TimeUnit.SECONDS.sleep(1);
countThread.interrupt();
Runner two = new Runner();
countThread = new Thread(two, "countThread");
countThread.start();
//睡眠1秒,main线程对two进行中断,使countThread能够感知on为false中断而结束
TimeUnit.SECONDS.sleep(1);
two.cancel();
}
private static class Runner implements Runnable{
private long i;
private volatile boolean on = true;
@Override
public void run(){
while (on && !Thread.currentThread().isInterrupted()){
i++;
}
System.out.println("count i = " + i);
}
public void cancel(){
on = false;
}
}
}
输出结果如下所示(输出内容可能不同):
count i = 543487324
count i = 540898082
示例在执行过程中,main线程通过中断操作和cancel()方法均可使countThread得以终止
这种通过标识位或者中断操作的方式能够使线程在终止时有机会去清理资源
- 而不是武断地将线程停止,因此这种终止线程的做法显得更加安全和优雅
线程间通信
线程开始运行,拥有自己的栈空间,就如同一个脚本一样,按照既定的代码一步一步地执行,直到终止
但是,每个运行中的线程,如果仅仅是孤立地运行,那么没有一点儿价值
- 或者说价值很少,如果多个线程能够相互配合完成工作,这将带来巨大价值
volatile和synchronized关键字
Java支持多个线程同时访问一个对象或者对象的成员变量
由于每个线程可以拥有这个变量的拷贝
- 虽然对象以及成员变量分配的内存是在共享内存中的,但是每个执行的线程还是可以拥有一份拷贝
- 这样做的目的是加速程序的执行
- 所以线程在执行过程中,一个线程看到变量并不一定是最新的
关键字
volatile
可以用来修饰字段(成员变量),就是告知程序任何对改变量的访问均需要从共享内存中获取
- 而对它的改变必须同步刷新回共享内存,它能保证所有线程对变量访问的可见性
关键字synchronized可以修饰方法或者以同步块的形式来进行使用
- 它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性
等待/通知机制
一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作
- 整个过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者就是消费者
- 这种模式隔离了 做什么 和 怎么做,在功能层面上实现了解耦
如下代码:
while(value != desire){
Thread.sleep(1000);
}
dosomething();
在条件不满足时就睡眠1秒,这样做的目的是防止过快的 无效 尝试
- 这种方式看似能够解决实现所需的功能,但是却存在如下问题
难以确保及时性:
- 在睡眠时,基本不消耗处理器资源,但是如果睡的太久,就不能及时发现条件已经变了
难以降低开销:
- 如果降低睡眠的时间,比如1毫秒,这样消费者能更加迅速地发现条件变化,但是却可能消耗更多的处理器资源,造成无端浪费
以上两个问题,看似矛盾难以调和,但是Java通过内置的等待/通知机制能够很好地解决这个矛盾并实现所需的功能
等待/通知的相关方法是任意Java对象都具备的,因为这些方法被定义在所有对象的超累
java.lang.Object
上
方法名称 | 描述 |
---|---|
notify() | 通知一个在对象上等待的线程,使其从wait()方法返回,而返回的前提是该线程获取到了对象的锁 |
notifyAll() | 通知所有等待爱该对象上的线程 |
wait() | 调用该方法的线程进入WAITING状态,只有等待另外线程的通知或被中断才会返回,需要注意,调用wait()方法后,会释放对象的锁 |
wait(long) | 超时等待一段时间,这里的参数时间是毫秒,也就是等待长达n毫秒,如果没有通知就超时返回 |
wait(long, int) | 对于超时时间更细粒度的控制,可以达到纳秒 |
等待/通知机制是指一个线程A调用了对象O的wait()方法进入等待状态
- 而另一个线程B调用了对象O的
notify()
或者notifyAll()
方法
- 线程A收到通知后从对象O的wait()方法返回,进而执行后续操作
上述两个线程通过对象O来完成交互,而对象上的wait()和notify()/notifyAll()的关系就如同开关信号一样
- 用来完成等待方和通知方之间的交互工作
在下列代码中,创建了两个线程——WaitThread和NotifyThread,前者检查flag值是否为false
- 如果符合要求,进行后续操作,否则在lock上等待,后者在睡眠了一段时间后对lock进行通知
public class WaitNotify {
static boolean flag = true;
static Object lock = new Object();
public static void main(String[] args) throws Exception{
Thread waitThread = new Thread(new Wait(), "WaitThread");
waitThread.start();
TimeUnit.SECONDS.sleep(1);
Thread notifyThread = new Thread(new Wait(), "NotifyThread");
notifyThread.start();
}
static class Wait implements Runnable{
@Override
public void run() {
//加锁,拥有lock的Monitor
synchronized (lock){
//条件不满足时,继续wait,同时释放了lock的锁
while (flag){
try {
System.out.println(Thread.currentThread() + "flag is true.wait@ "
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
lock.wait();
}catch (InterruptedException e){
}
}
//条件满足时,完成工作
System.out.println(Thread.currentThread() + "flag is false.wait@"
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
}
}
}
static class Nofity implements Runnable{
@Override
public void run() {
//加锁,拥有lock的Monitor
synchronized (lock){
//获取lock的锁,然后进行通知,通知时不会释放lock的锁
//直到当前线程释放了lock后,waitThread才能从wait方法中返回
System.out.println(Thread.currentThread() + "hold lock. notify@ "
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
lock.notifyAll();
flag = false;
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//再次加锁
synchronized (lock){
System.out.println(Thread.currentThread() + "hold lock again. sleep@"
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
输出如下:
Thread[WaitThread,5,main] flag is true.wait @ 22:23:03
Thread[NotifyThread,5,main] hold lock. notify @ 22:23:04
Thread[NotifyThread,5,main] hold lock again. sleep @ 22:23:09
Thread[WaitThread,5,main] flag is false.running @ 22:23:14
上述第3行和第4行输出的顺讯可能会互换
而上述例子主要说明了调用wait()、notify()以及notifyAll()时需要注意的细节,如下:
使用wait()、notify()以及
notifyAll()
时需要先对调用对象加锁使用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列
notify()或notifyAll()方法调用后,等待线程依旧不会从
wait()
返回
- 需要调用notify()或notifyAll()的线程释放锁之后,等待线程才有机会从wait()返回
notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中
- 而notifyAll()方法则是将等待队列中所有的线程全部移到同步队列,被移动的线程状态由WAITING变为BLOCKED
从wait()方法返回的前提是获得了调用对象的锁
从上述细节中可以看到,等待/通知机制依托于同步机制
- 其目的就是确保等待线程从wait()方法返回时能够感知到通知线程对变量做出的修改
以下描述了上例过程:
在上图中,WaitThread首先获取了对象的锁,然后调用对象的wait()方法
- 从而放弃了锁并进入了对象的等待队列
WaitQueue
中,进入等待状态由于WaitThread释放了对象的锁,NotifyThread随后获取了对象的锁
- 并调用对象的notify()方法,将WaitThread从WaitQueue移到SynchronizedQueue中,此时
WaitThread
的状态变为阻塞状态NotifyThread释放了锁之后,WaitThread再次获取到锁并从wait()方法返回继续执行
锁相关
Lock接口
锁是用来控制多个线程访问共享资源的方式
- 一般来说,一个锁能够防止多个线程同时访问共享资源(但是有些锁可以允许多个线程并发的访问共享资源,比如读写锁)
在Lock接口出现之前,Java程序是靠synchronized关键字实现锁功能的
而在Java SE 5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁功能
- 它提供了与
synchronized
关键字类似的同步功能,只是在使用时需要显示地获取和释放锁虽然它缺少了(通过synchronized块或者方法所提供的)隐式获取释放锁的便捷性
- 但是却拥有了锁获取与释放的可操作性、可中断的获取锁以及超时获取锁等多种
synchronized
关键字所不具备的同步特性
Lock使用很简单,如下是Lock的使用方式:
Lock lock = new ReentrantLock();
lock.lock();
try{
}finally{
lock.unlock;
}
在finally块释放锁,目的是保证在获取到锁之后,最后能够被释放
不要将获取锁的过程写在try块中,因为如果在获取锁(自定义锁的实现)时发生了异常
- 异常抛出的同时,也会导致锁无故释放
Lock接口提供的synchronized
关键字所不具备的主要特性如下表:
特性 | 描述 |
---|---|
尝试非阻塞地获取锁 | 当前线程尝试获取锁,如果这一课时没有锁没有被其他线程获取到,则成功获取并持有锁 |
能被中断地获取锁 | 与synchronized不同,获取到锁的线程能够响应中断,当获取到锁的线程被中断时,中断异常将会被抛出,同时锁会被释放 |
超时获取锁 | 在指定的截止时间之前获取锁,如果截止时间到了仍旧无法获取锁,则返回 |
Lock是一个接口,它定义了锁获取和释放的基本操作,Lock的API如下表:
方法名称 | 描述 |
---|---|
void lock() | 获取锁,调用该方法当前线程将会获取锁,当锁获得后,从该方法返回 |
void lockInterruptibly() throws InterruptedException | 可中断地获取锁,和lock()方法的不同之处在于该方法会响应中断,即在锁的获取中可以中断当前线程 |
boolean tryLock() | 尝试非阻塞的获取锁,调用该方法后立刻返回,如果能够获取则返回true,否则返回false |
boolean tryLock(long time, TimeUnit unit)throws InterruptedException | 超时的获取锁,当前线程哎以下三种情况会返回:1当前线程在超时时间内获得了锁2当前线程在超时时间内被中断3超时时间结束,返回false |
void unlock() | 释放锁 |
Condition newCondition() | 获取等待通知组件,该组件和当前的锁绑定,当前线程只有获得了锁,才能调用该组件的wait()方法,而调用后,当前线程将释放锁 |
队列同步器
队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组件的基础框架
- 它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作
- 并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础
同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态
- 在抽样方法的实现过程中免不了要对同步状态进行更改
- 这时就需要使用同步器提供的3个方法(getState()、setState(int newState)和compareAndSetState(int expect, int update))来进行操作
- 因为它们能够保证状态的改变是安全的
子类推荐被定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口
它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器可以支持独占式地获取同步状态
- 也可以支持共享式地获取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、ReentrantReadWriteLock和CountDownLock等)
队列同步器的接口与示例
同步器的设计是基础模版方法模式的,也就是说,使用者需要继承同步器并重写指定的方法
随后将同步器组合在自定义同步组件的实现中
并调用同步器提供的模版方法,而这些模版方法将会调用使用者重写的方法
重写同步器指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态
- getState():获取当前状态
- setState(int newState):设置当前同步状态
- compareAndSetState(int expect, int update):
- 使用CAS设置当前状态,该方法能够保证状态设置的原子性
同步器可重写的方法与描述如下表:
方法名称 | 描述 |
---|---|
protected boolean tryAcquire(int arg) | 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态 |
protected boolean tryRelease(int arg) | 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态 |
protected int tryAcquireShared(int arg) | 共享式获取同步状态,返回大于等于0的值,表示获取成功,反之,获取失败 |
protected boolean tryReleaseShared(int arg) | 共享式释放同步状态 |
protected boolean isHeldExclusively() | 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占 |
实现自定义同步组件时,将会调用同步器提供的模版方法,这些(部门)模版方法与描述如下:
方法名称 | 描述 |
---|---|
void acquire(int arg) | 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用重写的tryAcquire(int arg)方法 |
void acquireInterruptibly(int arg) | 与acquire(int arg)相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException并返回 |
boolean tryAcquireNanos(int arg, long nanos) | 在acquireInterruptibly(int arg)基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,那么将会返回false,如果获取到了返回true |
void acquireShared(int arg) | 共享式的获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态 |
void acquireSharedInterruptibly(int arg) | 与acquireShared(int arg)相同,该方法响应中断 |
boolean tryAcquireSharedNanos(int arg, long nanos) | 在acquireSharedInterruptibly(int arg)基础上增加了超时限制 |
boolean release(int arg) | 独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒 |
boolean releaseShared(int arg) | 共享式的释放同步状态 |
Collection<Thread> getQueuedThreads() |
获取等待在同步队列上的线程集合 |
同步器提供的模版方法基本上分为3类:
- 独占式获取与释放同步状态、共享式获取与释放同步状态和查询同步队列中的等待线程情况
自定义同步组件将使用同步器提供的模版方法来实现自己的同步语义
顾名思义,独占锁就是在同一时刻只能有一个线程获取到锁
- 而其他获取锁的线程只能处于同步队列中等待,只有获取锁的线程释放了锁,后继的线程才能够获取锁,如下代码所示
public class Mutex implements Lock {
private static class Sync extends AbstractQueuedLongSynchronizer{
//是否处于占用状态
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
//当状态为0的时候获取锁
public boolean tryAcquire(int acquires){
if(compareAndSetState(0, 1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//释放锁,将状态设置0
protected boolean tryRelease(int releases){
if(getState() == 0) throw new IllegalArgumentException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
//返回一个Condition,每个condition都包含一个condition队列
Condition newCondition(){return new ConditionObject();}
}
//仅需要将操作代理到Sync上即可
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
上述代码中,独占锁Mutex是一个自定义同步组件,它在同一时刻只允许一个线程占有锁
Mutex中定义了一个静态内部类,该内部类继承了同步器并实现了独占式获取和释放同步状态
在tryAcquire(int acquires)方法中,如果经过CAS设置成功(同步状态设置为1)
- 则代表获取了同步状态,而在
tryRelease(int releases)
方法中只是将同步状态重置为0.用户使用Mutex时并不会直接和内部同步器的实现打交道- 而是调用Mutex提供的方法,在Mutex实现中,以获取锁的lock()方法为例,只需要在方法实现中调用同步器的模版方法
acquire(int args)
即可- 当前线程调用该方法获取同步状态失败后被加入到同步队列中等待,这样就大大降低了实现一个可靠自定义同步组件的门槛
重入锁
重入锁ReentrantLock,顾名思义,就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁
除此之外,该锁的还支持获取锁时的公平和非公平性选择
ReentrantLock虽然没能像
synchronized
关键字一样支持隐式的重进入,但是在调用lock()方法时
- 已经获取到锁的线程,能够再次调用lock()方法获取锁而不被阻塞
这里提到一个锁获取的公平性问题,如果在绝对时间上
先对锁进行获取的请求一定先被满足,那么这个锁是公平的,反之是不公平的
公平的获取锁,也就是等待时间最长的线程最优先获取锁,也可以说锁获取是顺序的
ReentrantLock提供了一个构造函数,能够控制锁是否是公平的
实现重进入
重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞,该特性的实现需要解决以下两个问题
线程再次获取锁:
- 锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是则再次成功获取
锁的最终释放:
线程重复N次获取了锁,随后在第N次释放该锁后,其他线程能够获取到该锁
锁的最终释放要求锁对于获取进行技术自增,计数表示当前锁被重复获取次数
- 而锁被释放时,计数自减,当计数等于0时表示锁已经成功释放
ReentrantLock是通过组合自定义同步器来实现锁的获取与释放,以非公平性(默认的)实现为例,获取同步状态的代码如下
final boolean nonfairTryAcquire(int acquires){
final Thread current = Thread.currentThread();
int c = getState();
if(c == 0){
if(compareAndSetState(0, acquires)){
setExclusiveOwnerThread(current);
return true;
}
}else if(current == getExclusiveOwnerThread(current)){
if nextc = c + acquires;
if(nextc < 0) throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
改方法增加了再次获取同步状态的处理逻辑:
通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的线程再次请求
- 则将同步状态值进行增加并返回true,表示获取同步状态成功
成功获取锁的线程再次获取锁,只是增加了同步状态值
- 这也就要求ReentrantLock在释放同步状态时减少同步状态值,该方法的代码如下
protected final boolean tryRelease(int releases){
int c = getState() - releases;
if(Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException();
boolean free = false;
if(c == 0){
free = true;
ExclusiveOwnerThread(null);
}
setState(c);
return free;
}
如果该锁被获取了N次,那么前(N - 1)次tryRelease(int releases)方法必须返回false
- 而只有同步状态完全释放了,才能返回true
可以看到,该方法将同步状态是否为0作为最终释放的条件
- 当同步状态为0时,将占有线程设置为null,并返回true,表示释放成功
公平与非公平获取锁的区别
公平性与否是针对获取锁而言的,如果一个锁是公平的
- 那么锁的获取顺序就应该符合请求的绝对时间顺序,也就是FIFO
对于非公平锁,只要CAS设置同步状态成功,则表示当前线程获取了锁
- 而公平锁则不同,如ReentrantLock的
tryAcquire
方法
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
该方法与nonfairTryAcquire(int accquires)比较,唯一不同的位置为判断条件多了
hasQueuedPredecessors()
方法
- 即加入了同步队列中当前节点是否有前驱节点的判断,如果该方法返回true
- 则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁
读写锁
之前提到锁(如Mutex和ReentrantLock)基本都是排他锁,这些锁在同一时刻只允许一个线程进行访问
- 而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞
读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升
在没有读写锁支持的(Java 1.5之前)时候,如果需要完成上述工作就要使用Java等待通知机制,就是当写操作开始时
所有晚于写操作的读操作均会进入等待状态,只有写操作完成并进行通知之后
- 所有等待的读操作才能继续执行(写操作之间依靠
synchronized
关键进行同步)
- 这样做的目的是使读操作能正确读取到数据
改用读写锁实现上述功能,只需要在读操作时获取读锁,写操作时获取写锁即可
当写锁被获取到时,后续(非当前写操作线程)的读写操作都会被阻塞,写锁释放之后
- 所有操作继续执行,编程方式相对于使用等待通知机制的实现方式而言,变得简单明了
一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读时对于写的
- 在读多余写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量
Java并发包提供读写锁的实现是
ReentrantReadWriteLock
,它提供的特性如下表
特性 | 说明 |
---|---|
公平性选择 | 支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平 |
重进入 | 该锁支持重进入,以读写线程为例:读线程在获取了读锁之后,能够再次获取读锁。而写线程在获取了写锁之后能够再次获取写锁,同时也可以获取读锁 |
锁降级 | 遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁 |
读写锁的接口与示例
ReadWriteLock仅定义了获取读锁和写锁的两个方法,即readLock()方法和writeLock()方法
而其实现——
ReentrantReadWriteLock
,除了接口方法之外,还提供了一些便于外界监控其内部工作状态的方法
方法名称 | 描述 |
---|---|
int getReadLockCount() | 返回当前读锁被获取的次数。该次数不等于获取读锁的线程数,例如,仅一个线程,它连续获取(重进入)了n次读锁,那么占据读锁的线程数是1,但该方法返回n |
int getReadHoldCount() | 返回当前线程获取读锁的次数。该方法在Java6中加入到ReentrantReadWriteLock中,使用ThreadLock保存当前线程获取的次数,这要是的Java6的实现变得更加复杂 |
boolean isWriteLocked() | 判断写锁是否被获取 |
int getWriteHoldCount() | 返回当前写锁被获取的次数 |
接下来通过一个缓存示例说明读写锁的使用方式,如下代码:
public class Cache {
static Map<String, Object> map = new HashMap<>();
static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
static Lock readLock = reentrantReadWriteLock.readLock();
static Lock writeLock = reentrantReadWriteLock.writeLock();
//获取一个key对应的value
public static final Object get(String key){
readLock.lock();
try {
return map.get(key);
}finally {
readLock.unlock();
}
}
//设置key对应的value,并返回旧的value
public static final Object put(String key, Object value){
writeLock.unlock();
try {
return map.put(key, value);
}finally {
writeLock.unlock();
}
}
//清空所有内容
public static final void clean(){
writeLock.unlock();
try {
map.clear();
}finally {
writeLock.unlock();
}
}
}
Cache组合一个非线程安全的HashMap作为缓存的实现
- 同时使用读写锁的读锁和写锁来保证Cache是线程安全的
在读操作方法中,需要获取读锁,这使得并发访问该方法时不会被阻塞
写操作方法和clean方法,在更新hashmap时必须提前获取写锁
- 当获取写锁后,其他线程对于读锁和写锁的获取均被阻塞,而只有写锁被释放之后,其他读写操作才能继续
Cache使用读写锁提升读操作的并发性,也保证每次写操作对所有读写操作的可见性,同时简化了编程方式
读写锁的实现分析
接下来分析ReentrantReadWriteLock的实现,主要包括:
- 读写状态的设计、写锁的获取与释放、读锁的获取与释放以及锁降级
读写状态的设计:读写锁同意依赖自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态
回想ReentrantLock中自定义同步器的实现,同步状态表示锁被一个线程重复获取的次数
而读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态
使得该状态的设计成为读写锁实现的关键
读写锁是根据位运算知道各自状态的,有兴趣的小伙伴可以自行去查资料
写锁的获取与释放:
- 写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁,则增加写状态
如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程
- 则当前线程进入等待状态,获取写锁的代码如下
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
该方法除了重入条件(当前线程为获取 写锁的线程)之外,增加了一个读锁是否存在的判断
如果读锁存在,则写锁不能获取,原因在于:读写锁要确保写锁的操作对读锁可见
- 如果允许读锁在已被索取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作
因此只有等待其他线程都释放了读锁,写锁才能被当前线程获取
- 而写锁一旦被获取,则其他读写线程的后续访问均被阻塞
写锁的释放与ReentrantLock的释放过程基本类似,每次释放均减少写状态
- 当写状态为0时,表示写锁已被释放,从而等待的读写线程能够继续访问读写锁,同时前次写现承诺的修改对后续读写线程可见
读锁的获取与释放:
- 读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(或者写状态为0)时
- 读锁总会被成功地获取,而所做的也只是(线程安全的)增加读状态
锁降级:
锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放
- 最后再获取读锁,这种分段完成的过程不能称之为锁降级
锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。
LockSupport工具
当需要阻塞或唤醒一个线程的时候,都会使用LockSupport工具类来完成相应工作
LockSupport定义了一组的公共静态方法,这些方法提供了最基本的线程阻塞和唤醒功能
- 而LockSupport也成为构建同步组件的基础工具
LockSupport定义了一组以park开头的方法用来阻塞当前线程,以及
unpark(Thread thread)
方法来唤醒一个被阻塞的线程Park有停车的意思,假设线程为车辆,那么park方法代表着停车
- 而unpark方法则是指车辆启动离开,这些方法以及描述如下表
方法名称 | 描述 |
---|---|
void park() | 阻塞当前线程,如果调用unpark(Thread thraed)方法或者当前线程被中断,才能从park()方法返回 |
void parkNanos(long nanos) | 阻塞当前线程,最长不超过nanos秒,返回条件在park()的基础上增加了超时返回 |
void parkUntil(long deadline) | 阻塞当前线程,直到deadline时间(从1970年开始到deadline时间的毫秒数) |
void unpark(Thread thread) | 唤醒处于阻塞状态的线程thread |
Condition接口
任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上)
- 主要包括wait()、wait(long timeout)、nofity()以及notifyAll()方法
- 这些方法与
synchronized
同步关键字配合,可以实现等待/通知模式Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式
- 但是这两者在使用方式以及功能特性上还是有差别的
通过对比Object的监视器方法和Condition接口,可以更详细地了解Condition的特性,对比项与特性如下表:
对比项 | Object Monitor Methods | Condition |
---|---|---|
前置条件 | 获取对象的锁 | 调用Lock.lock()获取锁调用Lock.newCondition()获取Condition对象 |
调用方式 | 直接调用 如:object.wait() | 直接调用 如:condition.await() |
等待队列个数 | 一个 | 多个 |
当前线程释放锁并进入等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态,在等待状态中不响应中断 | 不支持 | 支持 |
当前线程释放锁并进入超时等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态到将来的某个时间 | 不支持 | 支持 |
唤醒等待队列中的一个线程 | 支持 | 支持 |
唤醒等待队列中的全部线程 | 支持 | 支持 |
Condition接口与示例:
Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时
- 需要提前获取到Condition对象关联的锁
Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创建出来的
- 换句话说,Condition是依赖Lock对象的
Condition的使用方式比较简单,需要注意在调用方法前获取锁,使用方式如下代码所示:
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException{
lock.lock();
try {
condition.await();
}finally {
lock.unlock();
}
}
public void conditionSignal() throws InterruptedException{
lock.lock();
try {
condition.signal();
}finally {
lock.unlock();
}
}
如示例所示,一般都会将Condition对象作为成员变量
当调用await()方法后,当前线程会释放锁并在此等待
- 而其他线程调用Condition对象的signal()方法,通知当前线程后
- 当前线程才从await()方法返回,并且在返回前已经获取了锁
并发容器和框架
ConcurrentHashMap的实现原理与使用
ConcurrentHashMap是线程安全且高效的hashmap
为什么要使用ConcurrentHashMap
在并发编程中使用HashMap可能导致程序死循环。而使用线程安全的HashTable效率又非常低下
- 基于以上两个原因,便有了ConcurrentHashMap的登场机会
线程不安全的HashMap:
在多线程环境下,使用HashMap进行put操作会引起死循环,导致CPU利用率接近100%
- 所以在并发情况下不能使用HashMap
例如一下代码
final HashMap<String, String> map = new HashMap<>(2);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
map.put(UUID.randomUUID().toString(), "");
}
}, "ftf" + i).start();
}
}
}, "ftf");
thread.start;
thread.join;
HashMap在并发执行put操作时会引起死循环,是因为多线程会导致
HashMap
的Entry链表形成环形数据结构
- 一旦形成环形数据结构,Entry的next节点永远不会为空,就会产生死循环获取Entry
效率低下的HashTable
HashTable容器使用synchronized来保证线程安全
- 但在线程竞争激烈的情况下HashTable的效率非常低下。
因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同步方法时,会进入阻塞或轮询状态
ConcurrentHashMap的锁分段技术可有效提升并发访问率
HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的线程都必须竞争同一把锁
假如容器里有多把锁,每一把锁用于锁容器的一部分数据,那么当多线程访问容器里不同数据段的数据时
- 线程间就不会存在锁竞争,从而可以有效提高并发访问率
这就是
ConcurrentHashMap
所使用的锁分段技术。首先将数据分成一段一段地存储
- 然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他数据段也能被其他线程访问
ConcurrentHashMap的结构
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成
Segment是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry则用于储存键值对数据
一个ConcurrentHashMap里包含一个Segment数组
- Segment的结构和HashEntry类似,是一种数组和链表结构
一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素
- 每个Segment守护着一个HashEntry数组里的元素
- 当对HashEntry数组的数据进行修改时,必须首先获得与它对于的Segment锁
ConcurrentHashMap的初始化:
ConcurrentHashMap初始化方法是通过initialCapacity,loadFactor和concurrencyLevel等几个参数
- 来初始化Segment数组、段偏移量segmentShift、段掩码segmentMask和每个segment里的HashEntry数组来实现的
初始化segments数组:
- 让我们来看一下初始化segments数组的源代码
if(concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS
int sshift = 0;
int ssize = 1;
while (ssize < DEFAULT_CONCURRENCY_LEVEL) {
++sshift;
ssize <<= 1;
}
int segmentShift = 32 - sshift;
int segmentMask = ssize - 1;
this.segments = Segment.newArray(ssize);
由上面代码可知,segments数组的长度ssize是通过concurrencyLevel计算得出的
为了能通过按位与的散列算法来定位segments数组的索引,必须保证segments数组的长度是2的N次方
- 所以必须计算出一个大于或等于
concurrencyLevel
的最小的2的N次方值来作为segments数组的长度concurrencyLevel的最大值是65535,这意味着segments数组的长度最大为65536,对应的二进制是16位
初始化segmentShift和segmentMask:这两个全局变量需要在定位segment时的散列算法里使用
- sshift等于ssize从1向左移位的次数,在默认情况下concurrencyLevel等于16
- 1需要向左移位移动4次,所以sshift等于4
segmentShift用于定位参与散列运算的位数,segmentShift等于32减去sshift,所以等于28
- 这里之所以用32是因为ConcurrentHashMap里的
hash()
方法输出的最大数是32位的segmentMask是散列运算的掩码,等于ssize减1,即15,掩码的二进制各个位的值都是1.因为ssize的最大长度是65536
- 所以segmentShift最大值是16,segmentMask最大值是65535,对应的二进制是16位,每个位都是1
初始化每个segment:
输入参数initalCapacity是ConcurrentHashMap的初始化容量,loadfactor是每个segment的负载因子
- 在构造方法里需要通过两个参数来初始化数组中的每个segment
定位segment
既然ConcurrentHashMap使用分段锁Segment来保护不同段的数据,那么在插入和获取元素的时候
- 必须先通过散列算法定位到segment
ConcurrentHashMap会首先使用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再散列
之所以进行再散列,目的是减少散列冲突
- 使元素能够均匀地分布在不同的Segment上,从而提高容器的存取效率
假如散列的质量差到极点,那么所有的元素都在一个Segment中,不仅存取元素缓慢,分段锁也会失去意义
默认情况下segmentShift为28,segmentMask为15,再散列后的数量最大是32位二进制数据,向右无符号移动28位
- 意思是让高4位参与到散列运算中,(
hash>>>segmentShit)&segmentMask
的运算结果分别是4,15,7和8
- 可以看到散列值没有发生冲突
ConcurrentHashMap的操作
get操作:Segment的get操作实现非常简单和高效
先经过一次再散列,然后使用这个散列值通过散列运算定位到Segment
- 再通过散列算法定位到元素,代码如下
public V get(Object key){
int hash = hash(key.hashCode());
return segmentFor(hash).get(key,hahsh)
}
get操作的高效之处在于整个get过程不需要加锁,除非读到的值是空才会加锁重读
它的get方法里将要使用的共享变量都定义成volatile类型
- 如用于统计当前Segment大小的count字段和用于存储值的HashEntry的value
定义成volatile的变量,能够在线程之间保持可见性,能够被多线程同时读,并且保证不会读到过期的值
- 但是只能被单线程写(有一种情况可以被多线程写,就是写入的值不依赖与原值)
在get操作里只需要读不需要写共享变量count和value,所以可以不用加锁。之所以不会读到过期的值,是因为根据Java内存模型的happen before原则
对volatile字段的写入操作先于读操作,即使两个线程同时修改和获取volatile变量
- get操作也能拿到最新的值,这是用volatiel替换锁的经典应用场景
transient volatile int count;
volatile V value;
在定位元素的代码里我们可以发现,定位HashEntry和定位Segment的散列算法虽然一样,都与数组的长度减去1再相与
- 但是想与的值不一样,定位segment使用的是元素的hashcode通过再散列后得到的值的高位
- 而定位HashEntry直接使用的是再散列后的值
其目的是避免两次散列后的值一样,虽然元素在Segment里散列开了,但是却没有再HashEntry里散列开
put操作:
由于put方法里需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须加锁
put方法首先定位到Segment,然后再Segment里进行插入操作。插入操作需要经历两个步骤
- 第一步判断是否需要对Segment里的HashEntry数组进行扩容
- 第二步添加元素的位置,然后将其放在HashEntry里
是否需要扩容:
在插入元素前会先判断Segment里的HashEntry数组是否超过容量,如果超过阙值,则对数组进行扩容
值得一提的是,Segment的扩容判断比
HashMap
更恰当,因为HashMap是在插入元素后判断元素是否已经到达容量的
- 如果达到了就进行扩容,但是很有可能扩容之后没有新元素插入,这时HashMap就进行了一次无效的扩容
如何扩容:
在扩容的时候,首先会创建一个容量是原来容量两倍的数组,然后将原数组里的元素进行再散列后插入到新的数组里
为了高效,ConcurrentHashMap不会对整个容器进行扩容,而只对某个
segment
进行扩容
size操作:
如果要统计整个ConcurrentHashMap里元素的大小,就必须统计所有Segment里元素的大小后求和
Segment里的全局变量count是一个colatile变量,那么在多线程场景下
- 是不是直接把所有的Segment的count相加就可以得到整个ConcurrentHashMap大小呢?
- 不是的,虽然相加时可以获取每个Segment的count最新值,但是可能累加前使用count发生了变化,那么统计结果就不准了
所以,最安全的做法是在统计size的时候把所有Segment的put,remone和clean方法全部锁住
但是这种做法显然非常抵消。因为在累加count操作过程中,之前累加过的count发生变化的几率非常小
所以ConcurrentHashMap的做法是先尝试2次通过不锁住Segment的方式来统计各个Segment大小
- 如果统计的过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment大小
ConcurrentHashMap如何判断统计时候发生了变化呢?
使用
modCount
变量,在put,remove和clean方法里操作元素前都会将变成modCount进行加1
- 那么在统计size前后比较modCount是否发生变化,从而得知容器的大小是否发生变化
ConcurrentLinkedQueue
在并发编程中,有时候需要使用线程安全的队列
如果要实现一个线程安全的队列有两种方式:
- 一种是使用阻塞算法,另一种是使用非阻塞算法
使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用于不同的锁)等方式实现
非阻塞的实现方式则可以使用循环CAS的方式实现
ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序
- 当我们添加一个元素的时候,它会添加到队列的尾部
- 当我们获取一个元素时候,它会返回队列头部的元素
它采用了wait-free算法(即CAS算法)来实现,概算在
Michael&Scoft
算法上进行了一些修改
ConcurrentLinkedQueue的结构:
ConcurrentLinkedQueue由head节点和tail节点组成,每个节点(node)由节点元素(item)和指向下一个节点(next)的引用组成
- 节点和节点之间就是通过这个next关联起来,从而组成一张链接结构的队列
默认情况下head节点存储的元素为空,tail节点等于head节点
private transient volatile Node<E> tail = head;
入队列
入队列的过程
入队列就是将入队节点添加到队列的尾部
- 为了方便理解入队时队列的变化,以及head节点和tail几点的变化,这里以一个示例来展开介绍
假设我们想在一个队列依次插入4个节点,每添加一个节点就做了一个队列的快照图,如下
添加元素1:
- 队列更新head节点的next节点为元素1节点。又因为tail节点默认情况下等于head节点,所以它们的next节点都指向元素的1节点
添加元素2:
- 队列首先设置元素1节点的next节点为元素2节点,然后更新tail节点指向元素2节点
添加元素3:
- 设置tail节点的next节点为元素3节点
添加元素4:
- 设置元素3的next几点为元素4节点,然后将tail节点指向元素4节点
通过调试入队过程并观察head节点和tail节点的变化,发现入队主要做两件事情:
第一是将入队节点设置成当前队列尾节点的下一个节点;
第二是更新tail节点,如果tail节点的next节点不为空,则将入队节点设置成tail节点
反之,则将入队及诶单设置成tail节点的next节点,所以节点不总是尾节点
如果是多线程进入入队,我们通过源码来详细分析它是如何使用
CAS
算法来入队的
/**
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never return {@code false}.
*
* @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
从源代码角度来看,整个入队过程主要做两件事情:
第一是定位出尾节点
第二是使用CAS算法将入队节点设置成尾节点的next节点,如不成功则重试
定位尾节点:
tail节点并不总是尾节点,所以每次入队都必须先通过tail节点来找到尾节点,尾节点可能是tail节点
- 也可能是tail节点的next节点,代码中循环体中的第一个if就是判断tail是否有next节点,有则表示next节点的可能是尾节点
获取tail节点的next节点需要注意的是p节点等于p的next节点情况
- 只有一种可能就是p节点和p的next节点都等于空,表示这个队列刚刚初始化,正准备添加节点
- 所以需要返回head节点,获取p节点的next节点代码如下
final Node<E> succ(Node<E> p) {
Node<E> next = p.next;
return (p == next) ? head : next;
}
设置入队节点为尾节点
p.casNext(null, n)方法用于将入队及诶单设置为当前队列尾尾节点的next节点
- 如果p是null,表示p是当前队列的尾节点,如果不为null,表示有其他线程更新了尾节点
- 则需要重新获取当前队列的尾节点
hops的设计意图
上面分析过对于先进先出的队列入队所要做的事情是将入队节点设置成尾节点
- dong lea写的代码和逻辑还是稍微有点复杂,那么我们用一下方式来实现是否可行?
public boolean offer(E e){
if(e == null) throw new NullPointerException();
Node<E> n = new Node<E>(e);
for(;;){
Node<E> t = tail;
if(t.casNext(null, n) && casTail(t, n)) return true;
}
}
让tail节点永远作为队列的尾节点,这样实现代码了非常少,而且逻辑清晰和易懂
但是,这么做有一个缺点,每次都需要使用循环CAS更新tail节点。如果能减少CAS更新tail节点的次数,就能提高入队的效率
- 所有dong lea使用hops变量来控制并减少tail节点的更新频率,并不是每次节点入队后都将tail节点更新成尾节点
而是当tail节点和尾节点的距离大于等于常量HOPS的值(默认等于1)时才更新tail节点,tail和尾节点的距离越长,使用CAS更新tail节点的次数就会越少
但是距离越长带来的负面效果就是每次入队时定位尾节点的时间就越长,因为循环体需要多循环一次来定位出尾节点,但是这样仍然能提高入队的效率
因为从本质上来看它通过增加对volatile变量的读操作来减少对
volatile
变量的写操作
- 而对volatile变量的写操作开销要远远大于读操作,所以入队效率会有所提升
注意:入队方法永远返回true,所以不要通过返回值判断入队是否成功
出队列
出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用
每个节点出队的快照如下
由图可知,并不是每次出队时都更新head节点,当head节点里有元素时
- 直弹出head节点里的元素,而不会更新head节点
只有当head节点里没有元素时,出队操作才会更新head节点
- 这种做法也是通过hops变量来减少使用
CAS
更新head节点的消耗,从而提高出队效率
Java中的阻塞队列
什么是阻塞队列
阻塞队列(
BlockingQueue
)是一个支持两个附加操作的队列,这两个附加的操作支持阻塞的插入和移除方法支持阻塞的插入方法:
- 意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满
支持阻塞的移除方法:
- 意思是在队列为空时,获取元素的线程会等待队列变为非空
阻塞队列常用语生产者和消费者的场景,生产者向队里里添加元素,消费者是从队列里取元素的线程
- 阻塞队列就是生产者存放元素,消费者获取元素的容器
在阻塞队列不可用时,这两个附加操作提供了4种处理方式
方法/处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除方法 | remove() | poll() | take() | poll(time, unit) |
检查方法 | element() | peek() | 不可用 | 不可用 |
抛出异常:当队列满时,如果再插入元素,会抛出IllegalStateException异常
- 当队列空时,从队列里获取元素会抛出
NoSuchElementException
异常返回特殊值:
- 当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法
- 则是从队列里取出一个元素,如果没有则返回null
一直阻塞:当阻塞队列满时,如果生产者往队列里put元素,队列会一直阻塞生产者,知道队里可用或者响应中断退出
当队列为空时,如果消费者从队列里take元素,队列会阻塞消费者线程,直到队列不为空
超时退出:
- 当阻塞队列满时,如果生产者往队里里插入元素,队列会阻塞生产者线程一段时间
- 如果超过了指定的时间,生产者线程就会退出
注意:如果是无界阻塞队列,队里不可能会出现满的情况,所以使用put或offer方法永远不会被阻塞
- 而且使用offer方法时,该方法永远返回true
Java里的阻塞队列
JDK提供了7种阻塞队列:
ArrayBlockingQueue:
- 一个由数组结构组成的有界阻塞队列
LinkedBlockingQueue:
- 一个由链表结构组成的有界阻塞队列
PriorityBlockingQueue:
- 一个支持优先级排序的无界阻塞队列
DelayQueue:
- 一个使用优先级队列实现的无界阻塞队列
SynchronousQueue:
- 一个不存储元素的阻塞队列
LinkedTransferQueue:
- 一个由链表结构组成的无界阻塞队列
LingkedBlockingDeque:
- 一个由链表结构组成的双向阻塞队列
ArrayBlockingQueue:
ArrayBlockingQueue是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序
- 默认情况下不保证线程公平的访问队列。
LinkedBlockingQueue:
LinkedBlockingQueue是一个用链表实现的有界阻塞队列
此队列的默认和最大长度为
Integer.MAX_VALUE
- 此队列按照先进先出的原则对元素进行排序
PriorityBlockingQueue:
PriorityBlockingQueue是一个支持优先级的无界阻塞队列
- 默认情况下元素采取自然顺序升序排列
也可以自定义类实现cpmpareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时
- 指定构造参数Comparator来对元素进行排序,需要注意的是不能保证同优先级元素的顺序
DelayQueue:
DelayQueue是一个支持延时获取元素的无界阻塞队列
队列使用PriorityQueue来实现
队列中的元素必须实现Delayed接口
在创建元素时可以指定多久才能从队列中获取当前元素
- 只有在延迟期满时才能从队列中提取元素
DelayQueue非常有用,可以运用在以下场景:
- 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue
- 一旦能从DelayQueue中获取元素时,表示缓存有效期到了
- 定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间
- 一旦从DelayQueue中获取到任务就开始执行,比如TimeQueue就是使用DelayQueue实现的
SynchronousQueue:
SynchronousQueue是一个不存储元素的阻塞队列
- 每一个put操作必须等待一个take操作,否则不能继续添加元素
它支持公平访问队列,默认情况下现场采用非公平性策略访问队列
- 使用以下构造方法可以创建公平性访问SynchronousQueue,如果设置成true,则等待的线程会采用先进先出的顺序访问队列
SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程
- 队列本身并不存储任何元素,非常适合传递性创建
SynchronousQueue的吞吐量高于
LinkedBlockingQueue
和ArrayBlockingQueue
LinkedTransferQueue:
LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列
- 相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法
tryTransfer:
- tryTransfer方法是用来试探生产者传入的元素是否能直接传给消费者
- 如果没有消费者等待接受元素,则返回false
- 和transfer方法的区别是tryTransfer方法无论消费者是否接受,方法立即返回,而transfer方法是必须等到消费者消费了才返回
对于带有时间限制的tryTransfer(E e,long timeout, TimeUnit unit)方法
- 试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待知道的时间再返回
- 如果超时还没有消费元素则返回false,反之返回true
transfer:
- 如果当前有消费者正在等待接受元素(消费者使用take()方法或带时间限制的poll()方法)
- transfer方法可以把生产者传入的元素立刻
transfer
(传输)给消费者
- 如果没有消费者在等待接受元素,transfer方法将会存放在队列的tail节点,并等到该元素被消费者消费了才返回
LingkedBlockingDeque:
LingkedBlockingDeque是一个由链表结构组成的双向阻塞队列
所谓双向队列指的是可以从队列的两端插入和移出元素,双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争
相比其他阻塞队列,
LingkedBlockingDeque
多了addFirst,addLast,offerFirst,offerLast,peekFirst和peekLast等方法
- 以First单词结尾的方法,表示插入。获取(peek)或移除双端队列的第一个元素
以Last单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。另外,插入方法add等同于addLast,移除方法remove等效于removeFirst
- 但是take方法却等同于takeFirst,不知道是不是JDK的bug,使用时还是用带有First和Last后缀的方法更清楚
在初始化LingkedBlockingDeque时可以设置容量防止其过度膨胀
- 另外,双向阻塞队列可以运用在工作窃取模式中
阻塞队列的实现原理:
如果队列是空的,消费者会一直等待,当生产者添加元素时,消费者是如何知道当前队列有元素的呢?
使用通知模式实现。所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生产者
- 当消费者消费了一个队列中的元素后,会通知生产者当前队列可用
- 通过查看JDK源码发现
ArrayBlockingQueue
使用了Condition来实现
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
Fork/Join框架
什么是Fork/Join框架:
Fork/Join框架是Java7提供的一个用于并行执行任务的框架
- 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架
工作窃取算法:
工作窃取算法(work-stealing)是指某个线程从其他队列里窃取任务来执行
优点:
- 充分利用线程进行并行计算,减少了线程间的竞争
缺点:
- 在某些情况下还是存在竞争,比如双端队列里只有一个任务时
- 并且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列
Fork/Join框架的设计:
步骤1:分割任务
- 首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停地分割,直到分割出的子任务足够小
步骤2:执行任务并合并结果
- 分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行
- 子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据
Fork/Join使用两个类来完成以上两个事情
ForkJoinTask:
- 我们要使用Fork/Join框架,必须首先创建一个Fork/Join任务
- 它提供在任务中执行fork()和join()操作的机制
- 通常情况下,我们不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下两个子类
- RecursiveAction:用于没有返回结果的任务
- RecursiveTask:用于有返回结果的任务
- ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行
任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队里的头部
- 当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务
Fork/Join框架的异常处理:
ForkJoinTask在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常
所有ForkJoinTask提供了
isCompletedAbnormally()
方法来检查任务是否已经抛出异常或已经被取消了并且可以通过ForkJoinTask的getException方法获取异常
getException方法返回Throwable对方
如果任务被取消了则返回
CancellationException
- 如果任务没有完成或者没有抛出异常则返回null
Fork/Join框架的实现原理:
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务
而ForkJoinWorkerThread数组负责执行这些任务
ForkJoinTask的fork方法实现原理:
- 当我们调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法异步地执行这个任务,然后立即返回结果
- pushTask方法把当前任务存放在ForkJoinTask数组队列里
- 然后再调用ForkJoinPool的signalWork()方法唤醒或创建一个工作线程来执行任务
ForkJoinTask的join方法实现原理:
- join方法的主要作用是阻塞当前线程并等待获取结果
- 首先它调用了doJoin()方法,通过doJoin()方法得到当前任务的状态来判断返回什么结果,任务状态有4种:
- 已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常(EXCEPTIONAL)
- 如果任务状态是已完成,则直接返回任务结果
- 如果任务状态是被取消,则直接抛出
CancellationException
- 如果任务时抛出异常,则直接抛出对应异常
在
doJoin()
方法里,首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成,则直接返回任务状态如果没有执行完,则从任务数组里取出任务并执行。如果任务顺利执行完成,则设置任务状态为NORMAL
如果出现异常,则记录异常。并将任务状态设置为EXCEPTIONAL
使用 Fork/Join框架:
让我们通过一个简单的需求来使用Fork/Join框架,需求是:计算1+2+3+4的结果
使用Fork/Join框架首先要考虑到时如何分割任务
如果希望每个子任务最多执行两个数相加,那么我们设置分割的阙值是2,由于是4个数字相加,所以Fork/Join框架会把这个任务fork成两个子任务
子任务一负责计算1+2,子任务而负责3+4,然后join两个子任务的结果
- 因为是有结果的任务,所以必须继承ResursiveTask,实现代码如下:
public class CountTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 2;//阙值
private int start;
private int end;
public CountTask(int start, int end){
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
//如果任务足够小就计算任务
boolean canCompute = (end - start) <= THRESHOLD;
if(canCompute){
for (int i = start; i <= end; i++) {
sum += i;
}
}else {
//如果任务大于阙值,就分裂成两个子任务计算
int middle = (start + end) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
//执行子任务
leftTask.fork();
rightTask.fork();
//等待子任务执行完,并得到其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
//合并子任务
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
//生成一个计算任务,负责计算1+2+3+4
CountTask countTask = new CountTask(1, 4);
//执行一个任务
Future<Integer> result = forkJoinPool.submit(countTask);
try {
System.out.println(result.get());
}catch (InterruptedException e){
}catch (ExecutionException e){
}
}
}
通过这个例子,我们进一步理解ForkJoinTask,ForkJoinTask与一般任务的主要区别在于它需要实现compute方法
- 在这个方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务
反之就必须分割成两个子任务,每个子任务在调用fork方法时,又会进入compute方法
- 看着当前子任务是否需要继续分割成子任务,如果不需要继承分割,则执行当前子任务并返回结果
使用join方法会等待子任务执行完并得到其结果
原子操作类
当程序更新一个变量时,如果多线程同时更新这个变量,可能得到期望之外的值
- 比如变量i = 1;A线程更新i + 1,B线程也更新i + 1,经过两个线程操作之后可能i不等于3,而是等于2
因为A和B线程在更新变量i的时候拿到的i都是1,这就是线程不安全的更新操作
- 通常我们会使用synchronized来解决这个问题,synchronized会保证多线程不会同时更新变量i
而Java从JDK1.5开始提供了
java.util.concurrent.atomic
(以下简称Atomic包)这个包中的原子操作类提供了一种用法简单、性能高效、线程安全地更新一个变量的方式
因为变量的类型有很多种,所以在Atomic包里一共提供了13个类,属于4种类型的原子更新方式
- 分别是原子更新基本类型、原子更新数组、原子更新引用和原子更新属性(字段)
Atomic包里的类基本都是使用Unsafe实现的包装类
原子更新基本类型类
使用原子的方式更新基本类型,Atomic包提供了以下3个类:
- AtomicBoolean:原子更新布尔类型
- AtomicInteger:原子更新整型
- AtomicLong:原子更新长整型
以上3个类提供的方法几乎一模一样,所以本节仅以AtomicInteger为例进行详解,AtomicInteger的常用方法如下:
int addAndGet(int delta):
- 以原子方式将输入的数值与实例中的值(AtomicInteger里的value)相加,并返回结果
boolean compareAndSet(int expect, int update):
- 如果输入的数值等于预期值,则以原子方式将该值设置为输入的值
int getAndIncrement():
- 以原子方式将当前值加1,注意,这里返回的是自增前的值
void lazySet(int newValue):
- 最终会设置成newValue,使用lazySet设置值后,可能导致其他线程在之后的一小段时间内还是可以读到旧的值
int getAndSet(int newValue):
- ·以原子方式设置为newValue的值,并返回旧值
AtomicInteger示例代码如下
public class AtomicIntegerTest {
static AtomicInteger atomicInteger = new AtomicInteger(1);
public static void main(String[] args) {
System.out.println(atomicInteger.getAndIncrement());
System.out.println(atomicInteger.get());
}
}
输出1 2
那么getAndIncrement()是如何实现原子操作的呢?让我们一起分析其实现原理
public final int getAndIncrement(){
for(;;){
int current = get();
int next = current + 1;
if(compareAndSet(current, next)) return current;
}
}
public final boolean compareAndSet(int expect, int update){
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
源码中for循环体的第一步先取得AtomicInteger里存储的数值,第二步对AtomicInteger的当前数值进行加1操作
关键的第三步调用compareAndSet方法来进行原子更新操作,该方法先检查当前数值是否等于current
等于意味着AtomicInteger的值没有被其他线程修改过,则将AtomicInteger的当前值更新成next的值
- 如果不等compareAndSet方法会返回false,程序会进入for循环重新进行compareAndSet操作
Atomic包提供了3种基本类型的原子更新,但是Java的基本类型里还有char、float和double等
那么如何原子的更新其他的基本类型呢?
Atomic包里的类基本都是使用Unsafe实现的,我们发现Unsafe只提供了3种CAS方法:compareAndSwapObject、compareAndSwapInt和compareAndSwapLong
再看AtomicBoolean源码,发现它是先把Boolean转换成整型,再使用compareAndSwapInt进行CAS
- 所以原子更新char、float和double变量也可以用类似的思路来实现。
原子更新数组
通过原子的方式更新数组里的某个元素,Atomic包提供了以下4个类:
- AtomicIntegerArray:原子更新整型数组里的元素
- AtomicLongArray:原子更新引用类型数组里的元素
- AtomicIntegerArray:主要是提供原子的方式更新数组里的整型,其常用的方法如下
- int addAndSet(int i, int delta):以原子方式将输入值与数组中索引i的元素相加
- boolean compareAndSet(int i, int expect, int update):如果当前值等于预期值,则以原子方式将数组位置i的元素设置成update值
以上几个类提供的方法几乎一样,本节仅以AtomicIntegerArray为例进行详解
public class AtomicIntegerArrayTest {
static int[] value = new int[]{1, 2};
static AtomicIntegerArray ai = new AtomicIntegerArray(value);
public static void main(String[] args) {
ai.getAndSet(0, 3);
System.out.println(ai.get(0));
System.out.println(value[0]);
}
}
输出
3
1
需要注意的是数组value通过构造方法传递进去,然后AtomicIntegerArray会将当前数组复制一份
- 所以当AtomicIntegerArray对内部的数组元素进行修改时,不会影响传入的数组
原子更新引用类型
原子更新基本类型的AtomicInteger,只能更新一个变量,如果要原子更新多个变量,就需要使用这个原子更新引用类型提供的类
Atomic包提供了以下3个类:
- AtomicReference:原子更新引用类型
- AtomicReferenceFieldUpdate:原子更新引用类型里的字段
- AtomicMarkableReference:原子更新带有标记的引用类型
- 可以原子更新一个布尔型的标记位和引用类型
- 构造方法是AtomicMarkableReference(V initialRef, boolean initialMark)
以上几个类提供的方法几乎一样,本节以AtomicReference为例
public class AtomicReferenceTest {
public static AtomicReference<User> atomicReference = new AtomicReference();
public static void main(String[] args) {
User user = new User("ff", 15);
atomicReference.set(user);
User updateUser = new User("XX",17);
atomicReference.compareAndSet(user, updateUser);
System.out.println(atomicReference.get().getName());
System.out.println(atomicReference.get().getOld());
}
static class User{
private String name;
private int old;
public User(String name, int old) {
this.name = name;
this.old = old;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getOld() {
return old;
}
public void setOld(int old) {
this.old = old;
}
}
}
输出
XX
17
代码中首先构建一个user对象,然后把user对象设置进AtomicReference中
- 最后调用compareAndSet方法进行原子更新操作,实现原理同AtomicInteger里的
compareAndSet
方法
原子更新字段类
如果需要原子地更新某个类里的某个字段时,就需要使用原子更新字段类,Atomic包提供了以下3个类进行原子字段更新:
- AtomicIntegerFieldUpdater:原子更新整型的字段的更新器
- AtomicLongFieldUpdater:原子更新长整型字段的更新器
- AtomicStampedReference:原子更新带有版本号的引用类型该类将整数值与引用关联起来
- 可用于原子的更新数据和数据的版本号,可以解决使用CAS进行原子更新时可能出现的ABA问题
要想原子地更新字段类需要两步。第一步因为原子更新字段类都是抽象类,每次使用的时候必须使用静态方法newUpdate()创建一个更新器
- 并且需要设置想要更新的类和属性。第二步,更新类的字段(属性)必须使用
public volatile
修饰符
以上3个类提供的方法几乎一样,本节以AtomicIntegerFieldUpdater为例
public class AtomicIntegerFieldUpdateTest {
//创建原子更新器,并设置需要更新的对象类和对象的属性
private static AtomicIntegerFieldUpdater<User> af = AtomicIntegerFieldUpdater.newUpdater(User.class, "old");
public static void main(String[] args) {
//设置年龄是10岁
User user = new User("ff", 10);
//增加一岁,但是仍然会输出旧的年龄
System.out.println(af.getAndIncrement(user));
//输出现在的年龄
System.out.println(af.get(user));
}
public static class User{
private String name;
public volatile int old;
public User(String name, int old) {
this.name = name;
this.old = old;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getOld() {
return old;
}
public void setOld(int old) {
this.old = old;
}
}
}
输出
10
11
并发工具类
等待多线程完成的CountDownLatch
countDownLatch允许一个或多个线程等待其他线程完成操作
public class CountDownLatchTest {
static CountDownLatch countDownLatch = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException{
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(1);
countDownLatch.countDown();
System.out.println(2);
countDownLatch.countDown();
}
}).start();
countDownLatch.await();
System.out.println(3);
}
}
countDownLatch的构造函数接受一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N
当我们调用countDownLatch的countDown方法时,N就会减去1,countDownLatch的await方法会阻塞当前线程,直到N变成零
- 由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤
用在多个线程时,只需要把这个countDownLatch的引用传递到线程里即可
如果有个线程处理的比较慢,我们不可能让主线程一直等待
- 所以可以使用另外一个带指定时间的await方法——
await(long time, TimeUnit unit)
,这个方法等待特定时间后,就会不再阻塞当前线程注意:计数器必须大于等于0,只是等于0时候,计数器就是零,调用await方法时不会阻塞当前线程
CountDownLatch不可能重新初始化或者修改CountDownLatch对象的内部计数器的值
一个线程调用countDown方法happen-before,另外一个线程调用await方法
同步屏障CyclicBarrier
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)
它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞
- 直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行
CyclicBarrier简介
CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程输了
- 每个线程调用的await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞
public class CyclicBarrierTest {
static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try{
cyclicBarrier.await();
}catch (Exception e){
} System.out.println(1);
}
}).start();
try {
cyclicBarrier.await();
}catch (Exception e){
}
System.out.println(2);
}
}
因为主线程和子线程的调度是由CPU决定的,两个线程都有可能先执行,所以会产生两种输出,1 2或者2 1
如果把new CyclicBarrier(2)改成new CyclicBarrier(3),则主线程和子线程会永远等待
- 因为没有第三个线程执行await方法,即没有第三个线程到达屏障,所以之前到达屏障的两个线程都不会继续执行
CyclicBarrier还提供一个更高级的构造函数
CyclicBarrier(int parties, Runnable barrierAction)
- 用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景,如下代码
public class CyclicBarrierTest2 {
static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new A());
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try{
cyclicBarrier.await();
}catch (Exception e){
} System.out.println(1);
}
}).start();
try {
cyclicBarrier.await();
}catch (Exception e){
}
System.out.println(2);
}
static class A implements Runnable{
@Override
public void run(){
System.out.println(3);
}
}
}
因为CyclicBarrier设置了拦截线程的数量是2,所以必须等代码中的第一个线程和线程A都执行之后
- 才会继续执行住线程,然后输出2,所以代码执行后的输出如下:3 1 2
CyclicBarrier和CountDownLatch的区别
countDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置
所以CyclicBarrier能处理更为复杂的业务场景
- 例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次。
CyclicBarrier还提供其他有用的方法,比如getNumberWaitting方法获取CyclicBarrier阻塞的线程输了
isBroken()方法用来了解阻塞的线程是否被中断
控制并发线程数的Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源
应用场景
Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接
假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发地读取
但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这是我们必须控制只有10个线程同时获取数据库连接保存数据
- 否则会报错无法获取数据库连接。这个时候,就可以使用Semaphore来做流量控制
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore semaphore = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("save date");
semaphore.release();
}catch (InterruptedException e){
}
}
});
}
threadPool.shutdown();
}
}
在代码中,虽然有30个线程在执行,但是只允许10个并发执行
Semaphore的构造方法Semaphore(int permits)接受一个整型的数字,表示可用的许可证数量
Semaphore用法简单,首先线程使用Semaphore的acquire()方法获取一个许可证
- 使用完之后调用
release()
方法归还许可证。还可以用tryAcquire()方法尝试获取许可证
线程间交换数据的Exchanger
Exchanger(交换者)是一个用于线程间协作的工具类
Exchanger用于进行线程间的数据交换,它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据
这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法
- 它会一直等待等二个线程也执行exchange方法,当两个线程都到达同步点时
- 这两个线程就可以交换数据,将本线程生产出来的数据传递给对方
Exchanger的应用场景
Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对接,这时候会交换两人的数据,并使用交叉规则得出2个交配结果
Exchanger也可以用于校队工作,比如我们需要将纸制银行流水通过人工的方式录入成电子银行流水
- 为了避免错误,采用AB岗两人进行录入,录入到Excel后,系统需要加载这个两个Excel,并对两个Excel数据进行校队
public class ExchangerTest {
private static final Exchanger<String> eg = new Exchanger<>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String A = "银行流水A";
eg.exchange(A);
}catch (InterruptedException e){
}
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String B = "银行流水B";
String A = eg.exchange("B");
System.out.println("A录入的是:" + A +",B录入的是:" + B + ".A和B数据是否一致:" + A.equals(B));
}catch (InterruptedException e){
}
}
});
threadPool.shutdown();
}
}
如果两个线程有一个没有执行exchange()方法,则会一直等待,如果担心有特殊情况发生
- 避免一直等待,可以使用
exchange(V x, long timeout, TimeUnit unit)
设置最大等待时长
线程池
Java中的线程池是运用场景最多的并发框架,几乎所有需求异步或并发执行任务的程序都可以使用线程池
在开发过程中,合理地使用线程池能够带来3个好处
降低资源消耗:
- 通过重复利用已创建的线程降低线程创建和销毁造成的消耗
提高响应速度:
- 当任务到达时,任务可以不需要等到线程创建就能立即执行
提高线程的可管理性:
- 线程是稀缺资源,如果无限制地创建,不仅消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控
线程池的实现原理
当向线程池提交一个任务之后,线程池是如何处理这个任务呢?如图
从图中可以看出,当提交一个新任务到线程池时,线程池的处理流程如下
线程池判断核心线程池里的线程是否都在执行任务
- 如果不是,则创建一个新的工作线程来执行任务
- 如果核心线程池里的线程都在执行任务,则进入下个流程
线程池判断工作队列是否已经满
- 如果工作队里没有满,则将新提交的任务存储在这个工作队列里,满了则交给饱和策略来处理这个任务
线程池判断线程池的线程数量是否已满
- 如果没有,则创建一个新的工作线程来执行任务任务,反之交给饱和策略来处理这个任务
ThreadPoolExecutor
执行execute()方法的示意图如下
ThreadPoolExecutor执行execute方法分下面4种情况:
如果当前运行线程少于corePoolSize,则创建新线程来执行任务(执行这一步需要获取全局锁)
如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue
如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(执行这一步需要获取全局锁)
如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝
- 并调用
RejectedExecutionHandler.rejectedExecution()
方法
ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时
- 尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)
- 在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize)
几乎所有的
execute()
方法调用都是执行步骤2,而步骤2不需要获取全局锁
线程池的使用
线程池的创建:
我们可以通过ThreadPoolExecutor来创建一个线程池:
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, KeepAliveTime, timeUnit, runnableTaskQueue, threadFactory, rejectedExecutionHandler);
创建一个线程池需要输入几个参数如下:
corePoolSize(核心池大小):
- 当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程
- 等到需要执行的任务数大于等于线程池的核心线程数就不在创建
- 调用prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程
runnableTaskQueue(任务队列):
- 用于保存等待执行的任务的阻塞队列
- 可以选择以下几个阻塞队列:
- ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO原则对元素进行排序
- LinkedBlockingQueue:是一个基于链表结构的阻塞队列,也是按照FIFO排序元素。吞吐量通常要高于ArrayBlockingQueue
- SynchronousBlocking:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状
- PriorityBlockingQueue:一个具有优先级的无限阻塞队列
maximumPoolSize(线程池最大数量):
- 线程池允许创建的最大线程数,如果队列满了,并且已创建的线程数小于最大线程数,则线程池会在创建新的线程执行任务
- 如果是无界的任务队列该参数没有效果
threadFactory:
- 用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字
rejectedExecutionHandler(饱和策略):
- 当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一个策略处理提交的新任务
- 这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常
- 在JDK.5中提供以下4种策略
- AbortPolicy:直接抛出异常
- CallerRunsPolicy:只用调用者所在线程来运行任务
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务
- DiscardPolicy:不处理,丢弃掉
当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化存储不能处理的任务
KeepAliveTime(线程活动保持时间):
- 线程池的工作线程空闲后,保持存活的时间
- 所以任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率
timeUnit(线程活动保持时间的单位):
- 可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)
- 微妙(MICROSECONDS,千分之一毫秒)、纳秒(NANOSECONDS,千分之一微妙)
向线程池提交任务:
可以使用两个方法向线程池提交任务,分别为execute()和submit()方法
execute方法用于提交不需要返回值的任务,所以无法判断任务是否被线程执行成功
submit方法用于提交需要返回值的任务
线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值
get()方法会阻塞当前线程知道任务完成,而使用
get(long timeout, TimeUnit unit)
方法则会阻塞当前线程一段时间后立即返回
- 这时候有可能任务没有执行完
关闭线程池:
可以通过调用线程池的shutdown和shutdownNow方法来关闭线程池
它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程
- 所以无法响应中断的任务可能永远无法终止。但是他们存在一定的区别,shutdownNow首先将线程池的状态设置成STOP
- 然后尝试停止所以的正在执行或暂停任务的线程,并返回等待执行任务的列表
- 而shutdow只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程
只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true
当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true
至于应该调用哪一种方法,应该由提交到线程池的任务特性决定
- 通常调用shutdown方法来关闭线程池,如果任务不一定执行完,则可以调用shutdownNow方法
合理地配置线程池:
要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析
- 任务的性质:CPU密集型任务、IO密集型任务和混合型任务
- 任务的优先级:高 中 低
- 任务的执行时间:长 中 短
- 任务的依赖性:是否依赖其他系统资源,如数据库连接
性质不同的任务可以用不同规模的线程池分开处理
- CPU密集型任务应配置可能小的线程。由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程
- 优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行
注意:如果一直有优先级搞的任务提交到队里里,那么优先级低的任务可能永远不能执行
执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行
线程池的监控
可以通过线程池提供的参数进行监控:
- taskCount:线程池需要执行的任务数量
- completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount
- largestPoolSize:线程池里曾经创建过的最大线程数量
- getPoolSize:线程池的线程数量
- getActiveCount:获取活动的线程数
通过扩展线程池进行监控。可以通过继承线程池来自定义线程池,重写线程池的beforeExecute、afterExecute和terminated方法
- 也可以在任务执行前、执行后和线程池关闭前执行一些代码来进行监控
Executor框架
在Java中,使用线程来异步执行任务
Java线程的创建与销毁需要一定的开销,如果我们为每一个任务创建一个新线程来执行,这些线程的创建与销毁将消耗大量的计算资源
同时,为每一个任务创建一个新线程来执行,这种策略可能会使处于高负荷状态的应用最终崩溃
Java的线程既是工作单元,也是执行机制
从JDK1.5开始,把工作单元与执行机制分离开来
- 工作单元包括Runnable和Callable,而执行机制有Executor框架提供
Executor框架简介
Executor框架的两级调度模型:
在HotSpot VM的线程模型中,Java线程(
java.lang.Thread
)被一对一映射为本地操作系统线程Java线程启动时会创建一个本地操作系统线程
- 当该Java线程终止时,这个操作系统线程也会被回收
操作系统会调度所有线程并将它们分配给可用的CPU
在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器将这些任务映射为固定数量的线程
在底层,操作系统内核将这些线程映射到硬件处理器上
这两种调度模型的示意图如下:
从图中可以看出,应用程序通过Executor框架控制上层的调度
而下层的调度由操作系统内核控制,下层的调度不受应用程序的控制
Executor框架的结构与成员
Executor的结构和Executor框架包含的成员组件
Executor框架的结构
Executor框架主要由3大部分组成如下:
任务:包括被执行任务需要实现的接口,Runnable接口或Callable接口
任务的执行:包括任务执行机制的核心接口Executor,以及继续自Executor的ExecutorService接口
Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)
异步计算的结果:包括接口Future和实现Future接口的TutureTask类
Executor框架包含的主要类和接口:
- Executor是一个接口,它是Executor框架的基础,它将任务的提交并任务的执行分离开来
ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务
ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令
- ScheduledThreadPoolExecutor比Timer更灵活,功能更强大
Future接口和实现Future接口的FutureTask类,代表异步计算的结果
Runnable接口或Callable接口的实现类,都可以被ThreadPoolExecutor和ScheduledThreadPoolExecutor执行
Executor框架的使用示意图如下图
主线程 首先要创建实现Runnable后者Callable接口的任务对象
工具类Executors可以把一个Runnable对象封装为一个Callable对象(
Executors.callable(Runnable task
)或Executors.callable(Runnable task, Object result))
- 然后可以把Runnable对照直接交给ExecutorsService执行
(ExecutorService.execute(Runnable command))
或者也可以把Runnable对象或Callable对象提交给ExecutorService执行
(ExecutorsService.submit(Runnable task))
或ExecutorService.submit(Callable<T> task)
如果执行
ExecutorService.submit(...)
,ExecutorService将返回一个实现Future接口的对象(到目前的JDK为止,返回的是FutureTask对象)由于FutureTask实现了Runnable,程序员也可以创建FutureTask,然后直接交给ExecutorService执行
最后,主线程可以执行
FutureTask.get()
方法来等待任务执行完成主线程也可以执行
FutureTask.cancel(boolean mayInterruptIfRunning)
来取消次任务的执行
Executor框架的成员
本节将介绍Executor框架的主要成员:
ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future接口、Runnable接口、Callable接口和Executors
ThreadPoolExecutor:ThreadPoolExecutor通常使用工厂类Executors来创建
Executors可以创建3种类型的ThreadPoolExecutor:SigleThreadExecutor、FixedThreadPool和CachedThreadPool
- SigleThreadExecutor:适用于需要保证顺序地执行各个任务,并且在任意时间点,不会有多个线程是活动的应用场景
- FixedThreadPool:适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器
- CachedThreadPool:CachedThreadPool是大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器
ScheduledThreadPoolExecutor:
通常使用工厂类Executors来创建
Executors可以创建2种类型的ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor:
- 包含若干个线程的ScheduledThreadPoolExecutor,适用于需要多个后台线程执行周期任务
SingleThreadScheduleExecutor:
- 只包含一个线程的ScheduledThreadPoolExecutor,适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的应用场景
Future:Future接口和实现Future接口的FutureTask类用来表示异步计算的结果
当我们把Runnable接口或Callable接口的实现类提交(submit)给ThreadPoolExecutor或ScheduleThreadPoolExecutor时
- ThreadPoolExecutor或ScheduleThreadPoolExecuto会向我们返回一个FutureTask对象
- 有一点需要注意,到目前JDK1.8为止,Java通过上述API返回是一个FutureTask对象
- 但从API可以看出,Java仅仅保证返回的是一个实现了Future接口的对象,在将来的JJDK实现中,返回的可能不一定是
FutureTask
Runnable接口和Callable接口
Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或ScheduleThreadPoolExecutor执行
他们之间的区别是Runnable不会返回结果,而Callable可以返回结果
除了可以自己创建实现Callable接口的对象外,还可以使用工厂类Executors来把一个Runnable包装成一个Callable
下面是Executors提供的,把一个Runnable包装成一个Callable的API
public static Callable<Object> callable(Runable task) //假设返回对象Callable1
下面是Executors提供的,把一个Runnable和一个待返回的结果包装成一个Callable的API
public static <T> Callable<T> callable(Runablen task,T result) //假设返回对象Callable2
前面讲过,当我们把一个Callable对象(比如上面的Callable1或Callable2)提交给ThreadPoolExecutor或ScheduleThreadPoolExecutor执行时
submit(...)
会向我们返回一个FutureTask对象我们可以执行
FutureTask.get()
方法来等待任务执行完成当任务成功完成后
FutureTask.get()
将返回该任务的结果例如,如果提交的是对象
Callable1,FutureTask.get()
方法将返回null,如果是Callable2,FutureTask.get()将返回result对象
ThreadPoolExecutor详解
Executor框架最核心的类是ThraedPoolExecutor,它是线程池的实现类,主要由下列4个组件构成:
- corePool:核心线程池的大小
- maximumPool:最大线程池的大小
- BlockingQueue:用来暂时保存任务的工作队列
- RejectedExecutionHandler:
- 当ThreadPoolExecutor已经关闭或ThreadPoolExecutor已经饱和时(达到了最大线程池大小且工作队列已满)
- execute()方法将要调用的Handler
通过Executor框架的工具类Executors,可以创建3种类型的ThreadPoolExecutor
- FixedThreadPool
- SigleThreadExecutor
- CachedThreadPool
FixedThreadPool的详解
FixedThreadPool被称为可重用固定线程数的线程池
FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads
当线程池中线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止
- 这里把keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止
FixedThreadPool的execute()方法的运行流程:
如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务
在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue
线程执行完任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行
FixedThreadPool使用误解的队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为
Integer.MAX_VALUE
)使用无界队列作为工作队列会对线程池带来如下影响:
当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize
由于1,使用无界队列时maximumPoolSize将是一个无效参数
由于1和2,使用无界队列时keepAliveTime将是一个无效参数
由于使用无界队列,运行中的FixedThreadPool(未执行方法shutdown()或shutdownNow())不会拒绝任务
- (不会调用
RejectedExetionHandler.rejectedExecution
方法)
SigleThreadExecutor的详解
SigleThreadExecutor是使用单个worker线程的Executor
SigleThreadExecutor的corePoolSize和maximumPoolSize被设置为1.其他参数与FixedThreadPool参数相同
SigleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工作队列
- SigleThreadExecutor使用无界队列作为工作队列对线程池带来的影响与FixedThreadPool相同
SigleThreadExecutor运行流程如下:
如果当前运行的线程数少于corePoolSize,则创建一个新线程来执行任务
在线程池完成预热之后,将任务加入LinkedBlockingQueue
线程执行完1中的任务后,会在一个无线循环中反复从
LinkedBlockingQueue
获取任务来执行
CachedThreadPool的详解
CachedThreadPool是一个会根据需要创建新线程的线程池
CachedThreadPool的corePoolSize被设置为0,即corePool为空
maximumPoolSize被设置为Integer.MAX_VALUE,即maximumPool是无界的
- 这里把keepAliveTime设置为60L,意味着CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止
CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但CachedThreadPool的maximunPool是无界的
这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程
- 极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源
流程如下:
首先执行
SynchronousQueue.offer(Runnable task)
- 如果当前maximumPool中有空闲线程正在执行
SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
- 那么主线程执行offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成,否则执行步骤2
当初始maximumPool为空,或者maximumPool中当前没有空闲线程时,将没有线程执行
Synchronous.poll(keepAliveTime, TimeUnit.NANOSECONDS)
- 这种情况下,步骤1将失败
- 此时CachedThreadPool会创建一个新线程执行任务,execute()方法执行完成
在步骤2中新创建的线程将任务执行完后,会执行
Synchronous.poll(keepAliveTime, TimeUnit.NANOSECONDS)
- 这个poll操作会让空闲线程最多在SynchronousQueue中等待60秒钟
如果60秒内主线程提交了一个新任务(主线程执行步骤1),那么这个空闲线程将执行主线程提交的新任务
- 否则,这个空闲线程将终止、由于空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源
前面提到过,SynchronousQueue是一个没有容量的阻塞队列
每个插入操作必须等待另一个线程的对应移除操作,反之亦然
- CachedThreadPool使用SynchronousQueue,把主线程提交的任务传递给空闲线程执行
CachedThreadPool中任务传递示意图如下
ScheduledThreadPoolExecutor详解
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor
- 它主要用来在给定的延迟之后的运行任务,或者定期执行任务
ScheduledThreadPoolExecutor的功能与Timer类似,但ScheduledThreadPoolExecutor功能更强大、更灵活
Timer对应的是单个后台线程,而ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数
ScheduledThreadPoolExecutor的运行机制
DelayQueue是一个无界队列:
- 所以ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中没有什么意义(设置maximumPoolSize的大小没有什么效果)
ScheduledThreadPoolExecutor的执行主要分为两大部分
当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法时
- 会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunnableScheduledFuture接口的ScheduledFutureTask
线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务
ScheduledThreadPoolExecutor为了实现周期性的执行任务,对
ThreadPoolExecutor
做了如下修改
使用DelayQueue作为任务队列
获取任务的方式不同
执行周期任务后,增加了额外的处理
ScheduledThreadPoolExecutor的实现
前面我们提到过,ScheduledThreadPoolExecutor会把待调度的任务(ScheduledFutureTask)放到一个DelayQueue中
ScheduledFutureTask主要包含3个成员变量,如下:
long型成员变量time,表示这个任务将要被执行的具体时间
long型成员变量sequenceNumber,表示这个任务被添加到ScheduledThreadPoolExecutor中的序号
long型成员变量period,表示任务执行的间隔周期
DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对队列中的ScheduledFutureTask进行排序
排序时,time小的排在前面(时间早的任务将被先执行)
如果两个ScheduledFutureTask的time相同:
- 就比较sequenceNumber,sequenceNumber小的排在前面(也就是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行)
首先,让我们看看ScheduledThreadPoolExecutor中的线程执行周期任务的过程
如下图:
线程1从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.take())
- 到期任务是指ScheduledFutureTask的time大于等于当前时间
线程1执行这个ScheduledFutureTask
线程1修改ScheduledFutureTask的time变量为下次将要被执行的时间
线程1把这个修改time之后的ScheduledFutureTask放回DelayQueue中(
DelayQueue.add()
)下图是
DelayQueue.take()
的执行示意图
如图所示,获取任务分为3大步骤
获取Lock
获取周期任务
- 如果PriorityQueue为空,当前线程到Condition中等待;否则执行下面2.2
- 如果PriorityQueue的头元素的time时间比当前时间大,到condition中等待到time时间;否则执行下面的2.3
- 获取PriorityQueue的头元素(2.3.1);如果PriorityQueue不为空,则唤醒在Condition中等待的所有线程(2.3.2)
释放Lock
- ScheduledThreadPoolExecutor在一个循环中执行步骤2,直到线程PriorityQueue获取到一个元素之后(执行2.3.1之后)才会退出无限循环(结束步骤2)
最后,ScheduledThreadPoolExecutor中的线程执行任务的步骤4
- 把ScheduledFutureTask放入DelayQueue中的过程
下图是
DelayQueue.add()
执行示意图
如图所示,添加任务分为3大步骤
获取 Lock 添加任务
- 向PriorityQueue添加任务
- 如果在上面2.1中添加的任务时PriorityQueue的头元素,唤醒在Condition中等待的所有线程
释放Lock
FutureTask详解
Future接口和实现Future接口的FutureTask类,代表异步计算的结果。
FutureTask简介
FutureTask除了实现Future接口外,还实现了Runnable接口
因此,FutureTask可以交给Executor执行,也可以由调用线程直接执行(
FutureTask.run()
)根据FutureTask
- run()方法被执行的时机,FutureTask可以处于下面3中状态
未启动:
FutureTask.run()
方法还没有被执行之前,FutureTask处于未启动状态
- 当创建一个FutureTask,且没有执行FutureTask,run()方法之前,这个FutureTask处于未启动状态
已启动:
FutureTask.run()
方法被执行的过程中,FutureTask处于已启动状态已完成:
FutureTask.run()
方法执行完后正常结束,或被取消(FutureTask.cancel(...)
)- 或执行
FutureTask.run()
方法时抛出异常而异常结束,FutureTask处于已完成状态
下图是FutureTask状态迁移的示意图
当FutureTask处于未启动或已启动状态时,执行
FutureTask.get()
方法将导致调用线程阻塞当FutureTask处于已完成状态时,执行
FutureTask.get()
方法将导致调用线程立即返回结果或抛出异常当FutureTask处于未启动状态时,执行
FutureTask.cancel()
方法将导致此任务永远不会被执行当FutureTask处于已启动状态时,执行
FutureTask.cancel(true)
方法将以中断执行此任务线程的方式来试图停止任务当FutureTask处于已启动状态时,执行
FutureTask.cancel(false)
方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成)当FutureTask处于已完成状态时,执行
FutureTask.cancel(...)
方法将返回false下图是get方法和cancel方法的执行示意图
FutureTask的使用
可以把FutureTask交给Executor执行
也可以通过
ExecutorService.submit(...)
方法返回一个FutureTask
- 然后执行
FutureTask.get()
方法或FutureTask.cancel(...)
方法。除此之外,还可以单独使用FutureTask当一个线程需要等待另外一个线程把某个任务执行完后它才能继续执行,此时可以使用FutureTask
假设有多个线程执行若干任务,每个任务最多只能被执行一次
- 当多个线程试图同时执行同一个任务时,只允许一个线程执行任务,其他线程需要等待这个任务执行完后才能继续执行