当前位置: 首页 > news >正文

高并发设计模式之ForkJoin模式

分而治之是一种思想,所谓分而治之就是把一个复杂的算法问题按一定的分解方法分为规模较小的若干部分,然后逐个解决,分别找出各部分的解,最后把各部分的解在整合成整个问题的解.ForkJoin模式就是分而治之思想的另一种应用.

ForkJoin模式的原理

ForkJoin模式先把一个大任务分解成许多独立的子任务,然后开启多个线程并行去处理这些子任务.执行的过程大致如下.

 ForkJoin框架

JUC包提供了一套ForkJoin框架的实现,具体以ForkJoinPool线程池的形式提供,并且该线程池在java8的Lambda并行流框架中充当着底层框架的角色.包含如下组件.

1:ForkJoinPool 执行任务的线程池,继承了 AbstractExecutorService类.

2:ForkJoinWorkerThread 执行任务的工作线程.每个线程都维护这一个内部队列,用于存放内部任务.该类继承了Thread类.

3:ForkJoinTask: 用于ForkJoinPool的任务抽象类,实现了Future接口.

4: RecursiveAction 不返回结果的递归执行任务,是ForkJoinTask的子类,在子任务不带返回结果时使用.

ForkJoin框架的使用实战

假设需要计算0-100的累加求和,可以使用ForkJoin框架完成.首先需要设计一个可以递归执行的异步子任务类.

1:可递归执行的异步任务类AccumulateTask

参考代码如下.

public class AccumulateTask extends RecursiveTask<Integer> {private static final int THRESHOLD = 2;/*** 累加的起始编号.*/private int start;/*** 累加的结束编号.*/private int end;public AccumulateTask(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;//判断任务的规模.若规模小则可以直接计算.Boolean canCompute = end - start <= THRESHOLD;if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}System.out.println("执行任务,计算" + start + "到" + end + "的和,结果是" + sum);} else {//任务过大需要进行切割.int middle = (start + end) / 2;//切割成两个任务.AccumulateTask lTask = new AccumulateTask(start, middle);AccumulateTask rTask = new AccumulateTask(middle + 1, end);//依次调用每个子任务的fork方法.lTask.fork();rTask.fork();//等待子任务完成.依次调用每个子任务的join方法合并并执行结果.Integer leftResult = lTask.join();Integer rightResult = rTask.join();//合并子任务的结果.sum = leftResult + rightResult;}return sum;}
}

自定义的异步任务子类AccumulateTask继承自RecursiveTask,每一次执行可以携带返回值.在AccumulateTask通过THRESHOLD常量设置子任务分解的阈值,并在它的compute()方法中会进行阈值的判断.逻辑如下.

1:若当前的计算规模大于THRESHOLD,则当前任需要进一步分解.相反则直接求和.

2:如果子任务可以直接执行,则进行求和操作,并返回结果.如果任务进行了分解,就需要等待所有的任务执行完毕,然后对各个分解结果进行求和.

ForkJoinPool调度AccumulateTask()

参考代码如下.

public class ForkJoinTest {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {ForkJoinPool forkJoinPool = new ForkJoinPool();AccumulateTask task = new AccumulateTask(1, 100);ForkJoinTask<Integer> future = forkJoinPool.submit(task);Integer integer = future.get(1, TimeUnit.SECONDS);System.out.println("最终计算结果为" + integer);}
}

ForkJoinPool框架的核心API

 ForkJoinPool框架的核心是ForkJoinPool线程池.该线程池使用一个无锁的栈来管理空闲线程,如果一个工作线程暂时取不到可用的任务,咋可能被挂起,而挂起的线程将被压入ForkJoinPool维护的栈中,等待新任务到来.

1:ForkJoinPool的构造器

public ForkJoinPool(int parallelism,//并行度 默认为cpu数,最小为1.ForkJoinWorkerThreadFactory factory,//线程创建工厂UncaughtExceptionHandler handler,//异常处理程序boolean asyncMode) {///是否为异步模式.this(checkParallelism(parallelism),checkFactory(factory),handler,asyncMode ? FIFO_QUEUE : LIFO_QUEUE,"ForkJoinPool-" + nextPoolId() + "-worker-");checkPermission();}

 parallelism:并行级别

设定的级别决定框架内并行线程的数量.并行的每一个任务都会有一个线程进行处理,但这个属性并不是框架的最大线程数量,该属性也和ThreadPoolExecutor线程池中的核心线程数和最大线程数有区别.因为他们的工作方式不一样.Forkjoin框架中可存在的线程数量和parallelism参数并不是绝对的关联.

Factory:线程创建工厂

当ForkJoin框架创建一个新的线程,同样会用到线程工厂,只不过这个线程不在需要实现ThreadFactory接口,而是需要实现ForkJoinWorkerThreadFactory接口.后者是一个函数接口.只需要实现一个叫newThread()方法,在ForkJoin框架中有一个默认的实现:DefaultForkJoinWorkerThreadFactory.

handler:异常捕获处理程序

当执行任务中出现异常,并从任务中被抛出时,就会被handler捕获.

 asyncMode:异步模式

该参数表示任务是否为异步模式,默认值为false.如果参数值为true,表示子任务的执行遵循FIFO(先进先出)顺序,并且子任务不能被合并,如果为false,表示子任务的执行顺序为LIFO(后进先出),并且子任务可以被合并.虽然表面意思为异步,仅仅指任务调度.

ForkJoinPool的common通用池

private static ForkJoinPool makeCommonPool() {final ForkJoinWorkerThreadFactory commonPoolForkJoinWorkerThreadFactory =new CommonPoolForkJoinWorkerThreadFactory();int parallelism = -1;ForkJoinWorkerThreadFactory factory = null;UncaughtExceptionHandler handler = null;try {  // ignore exceptions in accessing/parsing propertiesString pp = System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism");String fp = System.getProperty("java.util.concurrent.ForkJoinPool.common.threadFactory");String hp = System.getProperty("java.util.concurrent.ForkJoinPool.common.exceptionHandler");if (pp != null)parallelism = Integer.parseInt(pp);if (fp != null)factory = ((ForkJoinWorkerThreadFactory)ClassLoader.getSystemClassLoader().loadClass(fp).newInstance());if (hp != null)handler = ((UncaughtExceptionHandler)ClassLoader.getSystemClassLoader().loadClass(hp).newInstance());} catch (Exception ignore) {}if (factory == null) {if (System.getSecurityManager() == null)factory = commonPoolForkJoinWorkerThreadFactory;else // use security-managed defaultfactory = new InnocuousForkJoinWorkerThreadFactory();}if (parallelism < 0 && // default 1 less than #cores(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)parallelism = 1;if (parallelism > MAX_CAP)parallelism = MAX_CAP;return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,"ForkJoinPool.commonPool-worker-");}

使用common线程池的优点是可以通过指定系统属性的方式定义并行度 线程工厂和异常处理类. 

向ForkJoinPool线程池提交任务的方式

1:外部任务提交 

提交外部任务有三种方式:方式一使用invoke方法,该方法提交任务后线程会等待,等到任务计算完毕并返回结果.方法二使用execute()方法提交一个任务来异步执行,无返回结果.方法三使用submit方法提交一个任务,会返回一个ForkJoinTask实例,适当的时候获取结果.

2:子任务提交.

由任务实例的fork方法完成,当任务被分解以后,内部会调用ForkJoinPool.WorkQueue.push方法直接把任务放到内部队列中等待被执行.

工作窃取算法

 ForkJoinPool线程池的任务分为外部任务和内部任务,两种任务存放的位置不同.

1:外部任务放在ForkJoinPool全局队列中.

2:子任务会存放在内部队列中,ForkJoinPool线程池中的每个线程池都维护这一个内部队列.用于存放这些内部任务.

由于ForkJoinPool线程池有多个工作线程,与之相对应的就会有多个任务队列,就会出现任务分配不均衡的问题,有的任务队列任务多,忙的不停,有的队列没有任务一直空闲.工作窃取方法可以完美解决.

工作窃取的核心思想是:工作线程自己的活干完了,会去看看别人有没有没干完的活,如果有就拿过来帮忙做.每个线程拥有一个双端队列用于存放需要执行的任务,当自己队列没任务的时候,可以从其他队列里获取任务执行.

ForkJoin框架的原理

1:ForkJoin框架的线程池ForkJoinPool分为外部任务和内部任务.

2:外部任务是放在ForkJoinPool的全局队列中.

3:ForkJoinPool池中的每个线程都维护这一个内部任务队列用于存放内部任务,线程池切割的子任务就放在内部任务当中.

4:当工作线程想要拿到子任务的计算结果时,先判断子任务有没有完成.如果没有完成,再去判断子任务有没有被窃取.如果子任务没有被窃取就由本线程来完成.如果被窃取了,就去执行内部队列的其他任务.或者去扫描其他队列的任务.

5:当工作线程完成自己内部的任务,就会去扫描其他队列的任务.尽可能不会阻塞等待.

月盈则亏,月缺则满.修心则以.

如果大家喜欢我的分享的话,可以关注下我的微信公众号

心有九月星辰

 

 

 
 
 

 
 
 
 


http://www.mrgr.cn/news/62925.html

相关文章:

  • ZooKeeper 核心知识全解析:架构、角色、节点与应用
  • 【WPS】【WORDWORD】【JavaScript】实现微软WORD自动更正的效果
  • 牛客周赛 Round 76题解
  • HTML中meta的用法
  • java开发
  • 《Spring Framework实战》6:核心技术 4.1.IoC 容器
  • 客户的奇葩要求—在CAN网络的基础上加入了CAN_FD的节点
  • Redis(持续更新ing。。。)
  • 贪心算法习题其二【力扣】【算法学习day.18】
  • Rust 力扣 - 1343. 大小为 K 且平均值大于等于阈值的子数组数目
  • 博图V19的DB块,批量导入组态王
  • 如何去掉idea的Usage提示
  • 【英特尔IA-32架构软件开发者开发手册第3卷:系统编程指南】2001年版翻译,2-9
  • 关于自动驾驶等级相关知识
  • OpenCV与AI深度学习 | 基于OpenCV和深度学习预测年龄和性别
  • “换行”与“回车”
  • 深度数据修复软件哪个好?盘点2024年好用的4个数据恢复工具。
  • Redis新数据类型
  • 科研修图Adobe全家桶access
  • ANA基因组数据库(ANAgdb)
  • 【Leetcode】单调栈
  • 强化学习DQN实践(gymnasium+pytorch)
  • 人工智能生产力悖论:为什么大多数人没用上AI工具?
  • 得物App获评新奖项,正品保障夯实供应链创新水平
  • 今年双11,拼多多吹“新”风
  • tinyint数据库类型