当前位置: 首页 > news >正文

深入浅出JUC常用同步器

文章目录

    • 1.JUC下同步器
      • 1.1 CountdownLatch 倒计数锁存器
      • 1.2 CyclicBarrier回环屏障
      • 1.3 Semephone 信号量
    • 2.小结

1.JUC下同步器

   日常开发会遇到主线程开启多个子线程去并行执行任务,并且主线程需要等待所有子线程执行完后在进行汇总的场景。
   同步器出现之前,通常采用Thread.join()方法来实现,join方法不够灵活,JDK大佬就在JUC下新建了几个同步器,底层都是基于AQS实现。
   关于AQS可以看看这篇一文带你看懂Java多线程并发,深度剖析AQS源码

下面就针对JUC下三种常见同步器进行简要介绍。

1.1 CountdownLatch 倒计数锁存器

这个同步器相对比较简单,先使用构造方法初始化共享锁数count,然后每次调用countDown()方法, 内部调用sync.releaseShared(1)释放一把锁,锁数减一,直到锁Count等于0则会唤醒之前使用await()方法阻塞的线程。

先看这个tryReleaseShared()方法

	    public void countDown() {sync.releaseShared(1);}public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {signalNext(head);return true;}return false;}// 着重看这个方法protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {// 当前锁数int c = getState();// 锁已经为0 则不在执行减一,避免多线程下重复减一到负数if (c == 0)return false;// 锁减一int nextc = c - 1;// CAS操作原子性保证一个线程执行if (compareAndSetState(c, nextc))// 如果为0则为true那么将执行signalNext(head)方法return nextc == 0;}}// h 参数为head 头结点。 private static void signalNext(Node h) {Node s;// 如果头结点下一个节点不为空。if (h != null && (s = h.next) != null && s.status != 0) {// 取消WAITTING状态 转为唤醒状态s.getAndUnsetStatus(WAITING);// 唤醒s节点所对应的线程。LockSupport.unpark(s.waiter);}// CAS Node 节点 部分属性如下abstract static class Node {volatile Node prev;       // initially attached via casTailvolatile Node next;       // visibly nonnull when signallableThread waiter;            // visibly nonnull when enqueuedvolatile int status;      // written by owner, atomic bit ops by others}}

再接着看await() 方法是如何让线程陷入阻塞的。

    public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {// 正常Thread没有调用中断方法,会执行tryAcquireShared 方法if (Thread.interrupted() ||(tryAcquireShared(arg) < 0 &&acquire(null, arg, true, true, false, 0L) < 0))throw new InterruptedException();}// 初始化CountDownLatch对象时,getState()的值就已经发生变化// 因此这个通常都是返回-1。protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}// 正常逻辑都是走这个方法,代码量太大,着重分析部分关键信息。
final int acquire(Node node, int arg, boolean shared,boolean interruptible, boolean timed, long time) {// 当前线程实例              Thread current = Thread.currentThread();byte spins = 0, postSpins = 0;   // retries upon unpark of first threadboolean interrupted = false, first = false;Node pred = null;               // predecessor of node when enqueuedfor (;;) {if (!first && (pred = (node == null) ? null : node.prev) != null &&!(first = (head == pred))) {if (pred.status < 0) {cleanQueue();           // predecessor cancelledcontinue;} else if (pred.prev == null) {Thread.onSpinWait();    // ensure serializationcontinue;}}if (first || pred == null) {boolean acquired;try {if (shared)acquired = (tryAcquireShared(arg) >= 0);elseacquired = tryAcquire(arg);} catch (Throwable ex) {cancelAcquire(node, interrupted, false);throw ex;}if (acquired) {if (first) {node.prev = null;head = node;pred.next = null;node.waiter = null;if (shared)signalNextIfShared(node);if (interrupted)current.interrupt();}return 1;}}Node t;if ((t = tail) == null) {           // initialize queueif (tryInitializeHead() == null)return acquireOnOOME(shared, arg);} else if (node == null) {          // allocate; retry before enqueuetry {node = (shared) ? new SharedNode() : new ExclusiveNode();} catch (OutOfMemoryError oome) {return acquireOnOOME(shared, arg);}} else if (pred == null) {          // try to enqueue// 节点waiter存放当前线程实例。node.waiter = current;node.setPrevRelaxed(t);         // avoid unnecessary fenceif (!casTail(t, node))node.setPrevRelaxed(null);  // back outelse// t 是头节点。node就是t后置节点t.next = node;} else if (first && spins != 0) {--spins;                        // reduce unfairness on rewaitsThread.onSpinWait();} else if (node.status == 0) {// 节点状态设置为WAITING; 后面唤醒用。node.status = WAITING;          // enable signal and recheck} else {long nanos;spins = postSpins = (byte)((postSpins << 1) | 1);if (!timed)LockSupport.park(this);else if ((nanos = time - System.nanoTime()) > 0L)LockSupport.parkNanos(this, nanos);elsebreak;node.clearStatus();if ((interrupted |= Thread.interrupted()) && interruptible)break;}}return cancelAcquire(node, interrupted, interruptible);}

由于代码量太大,执行逻辑相对比较复杂,截取部分代码进行解析。

1.如果tail指针为null,初始化头结点,头结点为null 【第一个if】
2. node为空,是否共享,是则构建共享锁节点,否则构建独占锁节点。【else if】
3. pred 是前置节点 如果为空,设置当前node节点waiter 为当前线程,【else if】
node.status=WAITING; 设置当前节点状态为WAITING;

在这里插入图片描述

1.2 CyclicBarrier回环屏障

由于CountDownLatch是一次性同步方案,一旦计数器state=0后续在调用CountDown()方法就没用了,因此JDK大佬又创建了CyclicBarrier,可以通过reset()重置状态,让一组线程同步后可继续同步执行。适用于分段任务有序执行场景,这里对源码就不在探讨。采用独占锁ReentrantLock实现计数器原子更新。这个同步器相当于CountDownLatch增强版本,效率一般要略低,CountDownLatch 采用CAS来保证原子性。

重置方法reset()
在这里插入图片描述

1.3 Semephone 信号量

同步计数器递增实现,默认采用非公平策略,但是还可以通过参数传递来设置公平策略,可以实现以上两种同步器功能,但是计数器不可以自动重置,相对来说,功能更加强大。以上三种同步器都是基于AQS实现,因此大家需要重点掌握AQS,则可轻松看懂同步器源码实现方式。

2.小结

  关于以上三种同步器的使用,需要根据不同应用场景进行使用。

  1. 对于分段任务或者多个线程任务执行到指定位置需要进行聚合处理的 情况,建议使用CyclicBarrier同步器,
  2. 如没有特殊需求,就是一个简单的多个线程同步采用CountdownLatch同步器,
  3. 其他情况建议使用Semephone同步器,相对来说功能更为强大。

http://www.mrgr.cn/news/70747.html

相关文章:

  • django启动项目报错解决办法
  • 二级等保要求及设备有哪些?
  • C++中的原子操作:原子性、内存顺序、性能优化与原子变量赋值
  • CSS3_BFC(十二)
  • DimensionX:单图生成任意的3d/4d视图
  • 综合练习--轮播图
  • 【漏洞复现】用友 U8CRM leadconversion.php Sql注入漏洞
  • 基于STM32U575RIT6智能除湿器项目
  • 【星闪EBM-H63开发板】AT固件的配置与测试
  • 121页PPT | 企业战略规划全景:战略设计、工具、模板和分析方法
  • JAVA完成猜数字小游戏
  • python练习-Django web入门
  • STM32:ADC
  • 万字长文解读机器学习——决策树
  • [C++]——位图与布隆过滤器
  • Rust 模板匹配——根据指定图片查找处于大图中的位置(支持GPU加速)
  • APP封装系统 app误报毒app可上传 自动实现5分钟随机更换包名和签名
  • VMnet NAT模式配置
  • Perfetto中如何使用SQL语句
  • MutationObserver与IntersectionObserver的区别
  • IEEE JSSC更新|Tiny Tapeout:让每个人都能设计定制芯片
  • 【C】一文速学----线程池原理与实战
  • 【计算机网络】网络框架
  • C0028.在Clion中快速生成头文件中声明的函数的方法
  • 车载诊断架构---域控下挂节点信息同步策略
  • 基于51单片机密码锁—有3个密码lcd1602显示