常用并发工具类!

Semaphore信号量

信号量来控制那些需要限制并发访问量的资源。

信号量会维护 许可证 的计数,而线程去访问共享资源前,必须先拿到许可证。

线程可以从信号量中去获取一个许可证,一旦线程获取之后,信号量持有的许可证就转移过去了。

所以信号量手中剩余的许可证要减一。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class SemaphoreDemo {

static Semaphore semaphore = new Semaphore(3);

public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(50);
for (int i = 0; i < 1000; i++) {
service.submit(new Task());
}
service.shutdown();
}

static class Task implements Runnable {

@Override
public void run() {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "拿到了许可证,花费2秒执行慢服务");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("慢服务执行完毕," + Thread.currentThread().getName() + "释放了许可证");
semaphore.release();
}
}
}

信号量能被 FixedThreadPool 替代吗?

如果在应用程序中会有不同类型的任务,它们也是通过不同的线程池来调用慢服务的。

因为信号量具有跨线程、跨线程池的特性,所以即便请求来自于不同的线程池,我们也可以限制它们的访问。

如果用 FixedThreadPool 去限制,那就做不到跨线程池限制了。

CountDownLatch

在创建实例的时候,在构造函数中传入倒数次数,然后由需要等待的线程去调用 await 方法开始等待。

而每一次其他线程调用了 countDown 方法之后,计数便会减 1,直到减为 0 时,之前等待的线程便会继续运行。

场景1:一个线程等待其他多个线程都执行完毕,再继续自己的工作:

比如在比赛跑步时有 5 个运动员参赛,终点有一个裁判员,什么时候比赛结束呢?

  • 当所有人都跑到终点之后,这相当于裁判员等待 5 个运动员都跑到终点,宣布比赛结束。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class RunDemo {

public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int no = i + 1;
Runnable runnable = new Runnable() {

@Override
public void run() {
try {
Thread.sleep((long) (Math.random() * 10000));
System.out.println(no + "号运动员完成了比赛。");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}
};
service.submit(runnable);
}
System.out.println("等待5个运动员都跑完.....");
latch.await();
System.out.println("所有人都跑完了,比赛结束。");
}
}

场景2:多个线程等待某一个线程的信号,同时开始执行。

比如在运动会上,在运动员起跑之前都会等待裁判员发号施令,一声令下运动员统一起跑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class RunDemo {

public static void main(String[] args) throws InterruptedException {
System.out.println("运动员有5秒的准备时间");
CountDownLatch countDownLatch = new CountDownLatch(1);
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int no = i + 1;
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println(no + "号运动员准备完毕,等待裁判员的发令枪");
try {
countDownLatch.await();
System.out.println(no + "号运动员开始跑步了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
service.submit(runnable);
}
Thread.sleep(5000);
System.out.println("5秒准备时间已过,发令枪响,比赛开始!");
countDownLatch.countDown();
}
}

CyclicBarrier

CyclicBarrier 构造出一个集结点,当某一个线程执行 await() 的时候,它就会到这个集结点开始等待,等待这个栅栏被撤销。

直到预定数量的线程都到了这个集结点之后,这个栅栏就会被撤销,之前等待的线程就在此刻统一出发,继续去执行剩下的任务。

假设班级春游去公园里玩,并且会租借三人自行车,每个人都可以骑,但由于这辆自行车是三人的,所以要凑齐三个人才能骑一辆。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class CyclicBarrierDemo {

public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
for (int i = 0; i < 6; i++) {
new Thread(new Task(i + 1, cyclicBarrier)).start();
}
}

static class Task implements Runnable {

private int id;
private CyclicBarrier cyclicBarrier;

public Task(int id, CyclicBarrier cyclicBarrier) {
this.id = id;
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
System.out.println("同学" + id + "现在从大门出发,前往自行车驿站");
try {
Thread.sleep((long) (Math.random() * 10000));
System.out.println("同学" + id + "到了自行车驿站,开始等待其他人到达");
cyclicBarrier.await();
System.out.println("同学" + id + "开始骑车");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}

LockSupport

AQS内部控制线程阻塞和唤醒是通过LockSupport来实现的。

1
2
3
4
5
@IntrinsicCandidate
public native void park(boolean isAbsolute, long time);

@IntrinsicCandidate
public native void unpark(Object thread);

CopyOnWriteArrayList

具有以下特征:

线程安全的,多线程环境下可以直接使用,无需加锁。

通过锁 + 数组拷贝 + volatile 关键字保证了线程安全。

  • volatile:值被修改后,其它线程能够立马感知最新值。

每次数组操作,都会把数组拷贝一份出来,在新数组上进行操作,操作成功之后再赋值回去。

CopyOnWriteArrayList数据结构和ArrayList是一致的,底层是个数组,只不过CopyOnWriteArrayList在对数组进行操作的时候。

基本会分四步走:

1、加锁。

2、从原数组中拷贝出新数组。

3、在新数组上进行操作,并把新数组复制给数组容器。

4、解锁。

新增:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 添加元素到数组尾部
public boolean add(E e) {
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 得到所有的原数组
Object[] elements = getArray();
int len = elements.length;
// 拷贝到新数组里面,新数组的长度是 + 1 的,因为新增会多一个元素
Object[] newElements = Arrays.copyOf(elements, len + 1);
// 在新数组中进行赋值,新元素直接放在数组的尾部
newElements[len] = e;
// 替换掉原来的数组
setArray(newElements);
return true;
// finally 里面释放锁,保证即使 try 发生了异常,仍然能够释放锁
} finally {
lock.unlock();
}
}

适用场景:

读多写少的场景很适合使用 CopyOnWrite 集合。

缺点:

内存占用问题

  • 因为 CopyOnWrite 的写时复制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存。
  • 这一点会占用额外的内存空间。

在元素较多或者复杂的情况下,复制的开销很大

  • 复制过程不仅会占用双倍内存,还要消耗 CPU 等资源,会降低整体性能。

数据一致性问题

  • CopyOnWrite 容器的修改是先修改副本,所以这次修改对于其他线程来说,并不是实时能看到的。
  • 只有在修改完后才能体现出来。