常用并发工具类!
发表于更新于
Semaphore信号量
信号量来控制那些需要限制并发访问量的资源。
信号量会维护 许可证 的计数,而线程去访问共享资源前,必须先拿到许可证。
线程可以从信号量中去获取一个许可证,一旦线程获取之后,信号量持有的许可证就转移过去了。
所以信号量手中剩余的许可证要减一。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class SemaphoreDemo {
static Semaphore semaphore = new Semaphore(3);
public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(50); for (int i = 0; i < 1000; i++) { service.submit(new Task()); } service.shutdown(); }
static class Task implements Runnable {
@Override public void run() { try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "拿到了许可证,花费2秒执行慢服务"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("慢服务执行完毕," + Thread.currentThread().getName() + "释放了许可证"); semaphore.release(); } } }
|
信号量能被 FixedThreadPool 替代吗?
如果在应用程序中会有不同类型的任务,它们也是通过不同的线程池来调用慢服务的。
因为信号量具有跨线程、跨线程池的特性,所以即便请求来自于不同的线程池,我们也可以限制它们的访问。
如果用 FixedThreadPool 去限制,那就做不到跨线程池限制了。
CountDownLatch
在创建实例的时候,在构造函数中传入倒数次数,然后由需要等待的线程去调用 await 方法开始等待。
而每一次其他线程调用了 countDown 方法之后,计数便会减 1,直到减为 0 时,之前等待的线程便会继续运行。
场景1:一个线程等待其他多个线程都执行完毕,再继续自己的工作:
比如在比赛跑步时有 5 个运动员参赛,终点有一个裁判员,什么时候比赛结束呢?
- 当所有人都跑到终点之后,这相当于裁判员等待 5 个运动员都跑到终点,宣布比赛结束。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class RunDemo {
public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(5); ExecutorService service = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { final int no = i + 1; Runnable runnable = new Runnable() {
@Override public void run() { try { Thread.sleep((long) (Math.random() * 10000)); System.out.println(no + "号运动员完成了比赛。"); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); } } }; service.submit(runnable); } System.out.println("等待5个运动员都跑完....."); latch.await(); System.out.println("所有人都跑完了,比赛结束。"); } }
|
场景2:多个线程等待某一个线程的信号,同时开始执行。
比如在运动会上,在运动员起跑之前都会等待裁判员发号施令,一声令下运动员统一起跑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class RunDemo {
public static void main(String[] args) throws InterruptedException { System.out.println("运动员有5秒的准备时间"); CountDownLatch countDownLatch = new CountDownLatch(1); ExecutorService service = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { final int no = i + 1; Runnable runnable = new Runnable() { @Override public void run() { System.out.println(no + "号运动员准备完毕,等待裁判员的发令枪"); try { countDownLatch.await(); System.out.println(no + "号运动员开始跑步了"); } catch (InterruptedException e) { e.printStackTrace(); } } }; service.submit(runnable); } Thread.sleep(5000); System.out.println("5秒准备时间已过,发令枪响,比赛开始!"); countDownLatch.countDown(); } }
|
CyclicBarrier
CyclicBarrier 构造出一个集结点,当某一个线程执行 await() 的时候,它就会到这个集结点开始等待,等待这个栅栏被撤销。
直到预定数量的线程都到了这个集结点之后,这个栅栏就会被撤销,之前等待的线程就在此刻统一出发,继续去执行剩下的任务。
假设班级春游去公园里玩,并且会租借三人自行车,每个人都可以骑,但由于这辆自行车是三人的,所以要凑齐三个人才能骑一辆。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public class CyclicBarrierDemo {
public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(3); for (int i = 0; i < 6; i++) { new Thread(new Task(i + 1, cyclicBarrier)).start(); } }
static class Task implements Runnable {
private int id; private CyclicBarrier cyclicBarrier;
public Task(int id, CyclicBarrier cyclicBarrier) { this.id = id; this.cyclicBarrier = cyclicBarrier; }
@Override public void run() { System.out.println("同学" + id + "现在从大门出发,前往自行车驿站"); try { Thread.sleep((long) (Math.random() * 10000)); System.out.println("同学" + id + "到了自行车驿站,开始等待其他人到达"); cyclicBarrier.await(); System.out.println("同学" + id + "开始骑车"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } }
|
LockSupport
AQS
内部控制线程阻塞和唤醒是通过LockSupport
来实现的。
1 2 3 4 5
| @IntrinsicCandidate public native void park(boolean isAbsolute, long time);
@IntrinsicCandidate public native void unpark(Object thread);
|
CopyOnWriteArrayList
具有以下特征:
线程安全的,多线程环境下可以直接使用,无需加锁。
通过锁 + 数组拷贝 + volatile 关键字保证了线程安全。
- volatile:值被修改后,其它线程能够立马感知最新值。
每次数组操作,都会把数组拷贝一份出来,在新数组上进行操作,操作成功之后再赋值回去。
CopyOnWriteArrayList数据结构和ArrayList是一致的,底层是个数组,只不过CopyOnWriteArrayList在对数组进行操作的时候。
基本会分四步走:
1、加锁。
2、从原数组中拷贝出新数组。
3、在新数组上进行操作,并把新数组复制给数组容器。
4、解锁。
新增:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; setArray(newElements); return true; } finally { lock.unlock(); } }
|
适用场景:
读多写少的场景很适合使用 CopyOnWrite 集合。
缺点:
内存占用问题
- 因为 CopyOnWrite 的写时复制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存。
- 这一点会占用额外的内存空间。
在元素较多或者复杂的情况下,复制的开销很大
- 复制过程不仅会占用双倍内存,还要消耗 CPU 等资源,会降低整体性能。
数据一致性问题
- CopyOnWrite 容器的修改是先修改副本,所以这次修改对于其他线程来说,并不是实时能看到的。
- 只有在修改完后才能体现出来。