深入浅出JUC常用同步器
文章目录
- 1.JUC下同步器
- 1.1 CountdownLatch 倒计数锁存器
- 1.2 CyclicBarrier回环屏障
- 1.3 Semephone 信号量
- 2.小结
1.JUC下同步器
日常开发会遇到主线程开启多个子线程去并行执行任务,并且主线程需要等待所有子线程执行完后在进行汇总的场景。
同步器出现之前,通常采用Thread.join()方法来实现,join方法不够灵活,JDK大佬就在JUC下新建了几个同步器,底层都是基于AQS实现。
关于AQS可以看看这篇一文带你看懂Java多线程并发,深度剖析AQS源码
下面就针对JUC下三种常见同步器进行简要介绍。
1.1 CountdownLatch 倒计数锁存器
这个同步器相对比较简单,先使用构造方法初始化共享锁数count,然后每次调用countDown()方法, 内部调用sync.releaseShared(1)释放一把锁,锁数减一,直到锁Count等于0则会唤醒之前使用await()方法阻塞的线程。
先看这个tryReleaseShared()
方法
public void countDown() {sync.releaseShared(1);}public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {signalNext(head);return true;}return false;}// 着重看这个方法protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {// 当前锁数int c = getState();// 锁已经为0 则不在执行减一,避免多线程下重复减一到负数if (c == 0)return false;// 锁减一int nextc = c - 1;// CAS操作原子性保证一个线程执行if (compareAndSetState(c, nextc))// 如果为0则为true那么将执行signalNext(head)方法return nextc == 0;}}// h 参数为head 头结点。 private static void signalNext(Node h) {Node s;// 如果头结点下一个节点不为空。if (h != null && (s = h.next) != null && s.status != 0) {// 取消WAITTING状态 转为唤醒状态s.getAndUnsetStatus(WAITING);// 唤醒s节点所对应的线程。LockSupport.unpark(s.waiter);}// CAS Node 节点 部分属性如下abstract static class Node {volatile Node prev; // initially attached via casTailvolatile Node next; // visibly nonnull when signallableThread waiter; // visibly nonnull when enqueuedvolatile int status; // written by owner, atomic bit ops by others}}
再接着看await()
方法是如何让线程陷入阻塞的。
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {// 正常Thread没有调用中断方法,会执行tryAcquireShared 方法if (Thread.interrupted() ||(tryAcquireShared(arg) < 0 &&acquire(null, arg, true, true, false, 0L) < 0))throw new InterruptedException();}// 初始化CountDownLatch对象时,getState()的值就已经发生变化// 因此这个通常都是返回-1。protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}// 正常逻辑都是走这个方法,代码量太大,着重分析部分关键信息。
final int acquire(Node node, int arg, boolean shared,boolean interruptible, boolean timed, long time) {// 当前线程实例 Thread current = Thread.currentThread();byte spins = 0, postSpins = 0; // retries upon unpark of first threadboolean interrupted = false, first = false;Node pred = null; // predecessor of node when enqueuedfor (;;) {if (!first && (pred = (node == null) ? null : node.prev) != null &&!(first = (head == pred))) {if (pred.status < 0) {cleanQueue(); // predecessor cancelledcontinue;} else if (pred.prev == null) {Thread.onSpinWait(); // ensure serializationcontinue;}}if (first || pred == null) {boolean acquired;try {if (shared)acquired = (tryAcquireShared(arg) >= 0);elseacquired = tryAcquire(arg);} catch (Throwable ex) {cancelAcquire(node, interrupted, false);throw ex;}if (acquired) {if (first) {node.prev = null;head = node;pred.next = null;node.waiter = null;if (shared)signalNextIfShared(node);if (interrupted)current.interrupt();}return 1;}}Node t;if ((t = tail) == null) { // initialize queueif (tryInitializeHead() == null)return acquireOnOOME(shared, arg);} else if (node == null) { // allocate; retry before enqueuetry {node = (shared) ? new SharedNode() : new ExclusiveNode();} catch (OutOfMemoryError oome) {return acquireOnOOME(shared, arg);}} else if (pred == null) { // try to enqueue// 节点waiter存放当前线程实例。node.waiter = current;node.setPrevRelaxed(t); // avoid unnecessary fenceif (!casTail(t, node))node.setPrevRelaxed(null); // back outelse// t 是头节点。node就是t后置节点t.next = node;} else if (first && spins != 0) {--spins; // reduce unfairness on rewaitsThread.onSpinWait();} else if (node.status == 0) {// 节点状态设置为WAITING; 后面唤醒用。node.status = WAITING; // enable signal and recheck} else {long nanos;spins = postSpins = (byte)((postSpins << 1) | 1);if (!timed)LockSupport.park(this);else if ((nanos = time - System.nanoTime()) > 0L)LockSupport.parkNanos(this, nanos);elsebreak;node.clearStatus();if ((interrupted |= Thread.interrupted()) && interruptible)break;}}return cancelAcquire(node, interrupted, interruptible);}
由于代码量太大,执行逻辑相对比较复杂,截取部分代码进行解析。
1.如果tail指针为null,初始化头结点,头结点为null 【第一个if】
2. node为空,是否共享,是则构建共享锁节点,否则构建独占锁节点。【else if】
3. pred 是前置节点 如果为空,设置当前node节点waiter 为当前线程,【else if】
node.status=WAITING; 设置当前节点状态为WAITING;
1.2 CyclicBarrier回环屏障
由于CountDownLatch是
一次性
同步方案,一旦计数器state=0后续在调用CountDown()方法就没用了,因此JDK大佬又创建了CyclicBarrier,可以通过reset()重置状态
,让一组线程同步后可继续同步执行。适用于分段任务有序执行场景
,这里对源码就不在探讨。采用独占锁ReentrantLock实现计数器原子更新。这个同步器相当于CountDownLatch增强版本,效率一般要略低,CountDownLatch 采用CAS来保证原子性。
重置方法reset()
1.3 Semephone 信号量
同步计数器递增实现,默认采用非公平策略,但是还可以通过参数传递来设置公平策略,可以
实现以上两种同步器功能
,但是计数器不可以自动重置
,相对来说,功能更加强大。以上三种同步器都是基于AQS实现
,因此大家需要重点掌握AQS,则可轻松看懂同步器源码实现方式。
2.小结
关于以上三种同步器的使用,需要根据不同应用场景进行使用。
- 对于
分段任务
或者多个线程任务执行到指定位置需要进行聚合处理
的 情况,建议使用CyclicBarrier
同步器,- 如没有特殊需求,就是一个简单的多个线程同步采用
CountdownLatch
同步器,- 其他情况建议使用
Semephone
同步器,相对来说功能更为强大。