当前位置: 首页 > news >正文

鸿蒙多线程开发——TaskPool任务池

1、前 言

目前ArkTS提供了TaskPool和Worker两种并发能力,TaskPool和Worker都基于Actor并发模型实现(关于Actor并发模型和内存共享模型的区别,参考之前的文章 鸿蒙多线程开发——并发模型对比(Actor与内存共享))。

本文先针对TaskPool做介绍。

2、TaskPool

2.1、基本介绍

TaskPool 作用是为应用程序提供一个多线程的运行环境,降低整体资源的消耗、提高系统的整体性能,且您无需关心线程实例的生命周期。

有一定开发经验的朋友应该对线程池不陌生。大致描述如下:

TaskPool允许开发者在主线程封装任务抛给任务队列,系统选择合适的工作线程,进行任务的分发及执行,再将结果返回给主线程。接口直观易用,支持任务的执行、取消,以及指定优先级的能力,同时通过系统统一线程管理,结合动态调度及负载均衡算法,可以节约系统资源。系统默认会启动一个任务工作线程,当任务较多时会扩容,工作线程数量上限跟当前设备的物理核数相关,具体数量内部管理,保证最优的调度及执行效率,长时间没有任务分发时会缩容,减少工作线程数量。示意图如下:

图片

2.2、注意事项

在使用TaskPool时,有一些需要注意的事项,如下:

  • 实现任务的函数需要使用装饰器@Concurrent标注,且仅支持在.ets文件中使用。

  • 从API version 11开始,实现任务的函数需要使用类方法时,该类必须使用装饰器@Sendable装饰器标注,且仅支持在.ets文件中使用。

  • 任务函数在TaskPool工作线程的执行耗时不能超过3分钟(不包含Promise和async/await异步调用的耗时,例如网络下载、文件读写等I/O任务的耗时),否则会被强制退出。

  • 实现任务的函数入参需满足序列化支持的类型。

  • ArrayBuffer参数在TaskPool中默认转移,需要设置转移列表的话可通过接口setTransferList()设置。

  • 由于不同线程中上下文对象是不同的,因此TaskPool工作线程只能使用线程安全的库,例如UI相关的非线程安全库不能使用。

  • 序列化传输的数据量大小限制为16MB。

  • Priority的IDLE优先级是用来标记需要在后台运行的耗时任务(例如数据同步、备份。),它的优先级别是最低的。这种优先级标记的任务只会在所有线程都空闲的情况下触发执行,并且只会占用一个线程来执行。

  • Promise不支持跨线程传递,不能作为concurrent function的返回值。

  • 不支持在TaskPool工作线程中使用AppStorage。

正面案例:

// 正例@Concurrentasync function asyncFunc(val1:number, val2:number): Promise<number> {  let ret: number = await new Promise((resolve, reject) => {    let value = val1 + val2;    resolve(value);  });  return ret; // 支持。直接返回Promise的结果。}function taskpoolExecute() {  taskpool.execute(asyncFunc, 10, 20).then((result: Object) => {    console.info("taskPoolTest task result: " + result);  }).catch((err: string) => {    console.error("taskPoolTest test occur error: " + err);  });}taskpoolExecute()

反面案例:​​​​​​​

// 反例1:@Concurrentasync function asyncFunc(val1:number, val2:number): Promise<number> {  let ret: number = await new Promise((resolve, reject) => {    let value = val1 + val2;    resolve(value);  });  return Promise.resolve(ret); // 不支持。Promise.resolve仍是Promise,其状态是pending,无法作为返回值使用。}// 反例2:@Concurrentasync function asyncFunc(val1:number, val2:number): Promise<number> {  // 不支持。其状态是pending,无法作为返回值使用。  return new Promise((resolve, reject) => {    setTimeout(() => {      let value = val1 + val2;      resolve(value);    }, 2000);  }); }function taskpoolExecute() {  taskpool.execute(asyncFunc, 10, 20).then((result: Object) => {    console.info("taskPoolTest task result: " + result);  }).catch((err: string) => {    console.error("taskPoolTest test occur error: " + err);  });}taskpoolExecute()

3、TaskPool接口介绍

  • 模块导入

import { taskpool } from '@kit.ArkTS';
  • 核心API

// 将待执行的函数放入taskpool内部任务队列 (函数不会立即执行,而是等待分发到工作线程执行。当前执行模式不可取消任务)execute(func: Function, ...args: Object[]): Promise<Object>// 将创建好的任务放入taskpool内部任务队列 (任务不会立即执行,而是等待分发到工作线程执行。当前执行模式可以设置任务优先级和尝试调用cancel进行任务取消。该任务不可以是任务组任务和串行)execute(task: Task, priority?: Priority): Promise<Object>// 延时执行任务。当前执行模式可以设置任务优先级和尝试调用cancel进行任务取消。该任务不可以是任务组任务、串行队列任务和周期任务。若该任务非长时任务,可以多次调用executeDelayed执行,长时任务仅支持执行一次。executeDelayed(delayTime: number, task: Task, priority?: Priority): Promise<Object>// 周期执行任务,每隔period时长执行一次任务。当前执行模式支持设置任务优先级和调用cancel取消任务周期执行。周期任务不可以是任务组任务和串行队列任务,不可以再次调用执行接口,不可以拥有依赖关系。executePeriodically(period: number, task: Task, priority?: Priority): void// 取消任务池中的任务。当任务在taskpool等待队列中,取消该任务后该任务将不再执行,并返回undefined作为结果;当任务已经在taskpool工作线程执行,取消该任务并不影响任务继续执行,执行结果在catch分支返回,搭配isCanceled使用可以对任务取消行为作出响应。taskpool.cancel对其之前的taskpool.execute/taskpool.executeDelayed生效。cancel(task: Task): void// 取消任务池中的任务组。当一个任务组的任务未全部执行结束时取消任务组,返回undefined作为任务组结果。cancel(group: TaskGroup): void// 中止任务池中的长时任务,在长时任务执行完成后调用。中止后,执行长时任务的线程可能会被回收。terminateTask(longTask: LongTask): void// 检查函数是否为并发函数。isConcurrent(func: Function): boolean// 获取任务池内部信息,包含线程信息和任务信息。getTaskPoolInfo(): TaskPoolInfo

在核心API中,有几个关键的类或枚举:Priority、Task。定义如下:​​​​​​​

enum Priority {  HIGH = 0,  //任务为高优先级。  MEDIUM = 1, // 任务为中优先级。  LOW = 2,   // 任务为低优先级。  IDLE = 3   //任务为后台任务。}class Task {  // 属性  function: Function // 创建任务时需要传入的函数,支持的函数返回值类型请查序列化支持类型。  arguments: Object[] // 创建任务传入函数所需的参数,支持的参数类型请查序列化支持类型。  name: string  // 创建任务时指定的任务名称。  totalDuration: number  // 执行任务总耗时。  ioDuration: number  // 执行任务异步IO耗时。  cpuDuration: number  // 执行任务CPU耗时。    // 方法  // 构造函数  constructor(func: Function, ...args: Object[])  constructor(name: string, func: Function, ...args: Object[])  static isCanceled(): boolean; // 任务是否已经取消  isDone(): boolean // 检查任务是否已完成。  /**   设置任务的传输列表。   此接口可以设置任务池中ArrayBuffer的transfer列表,transfer列表中的ArrayBuffer对象在传输时不会复制buffer内容到工作线程而是转移buffer控制权至工作线程,传输后当前的ArrayBuffer失效。若ArrayBuffer为空,则不会transfer转移。   */  setTransferList(transfer?: ArrayBuffer[]): void  /**    设置任务的拷贝列表。    需搭配@Sendable装饰器使用,否则会抛异常。   */  setCloneList(cloneList: Object[] | ArrayBuffer[]): void  // 在任务执行过程中向宿主线程发送消息并触发回调。使用该方法前需要先构造Task。  static sendData(...args: Object[]): void  // 为任务注册回调函数,以接收和处理来自任务池工作线程的数据。使用该方法前需要先构造Task。  onReceiveData(callback?: Function): void  // 为当前任务添加对其他任务的依赖。使用该方法前需要先构造Task。该任务和被依赖的任务不可以是任务组任务、串行队列任务、已执行的任务和周期任务。存在依赖关系的任务(依赖其他任务的任务或被依赖的任务)执行后不可以再次执行。  addDependency(...tasks: Task[]): void  // 删除当前任务对其他任务的依赖。使用该方法前需要先构造Task。  removeDependency(...tasks: Task[]): void  // 注册一个回调函数,并在任务入队时调用它。需在任务执行前注册,否则会抛异常。  onEnqueued(callback: CallbackFunction): void  // 注册一个回调函数,并在执行任务前调用它。需在任务执行前注册,否则会抛异常。  onStartExecution(callback: CallbackFunction): void  // 注册一个回调函数,并在任务执行失败时调用它。需在任务执行前注册,否则会抛异常。  onExecutionFailed(callback: CallbackFunctionWithError): void  // 注册一个回调函数,并在任务执行成功时调用它。需在任务执行前注册,否则会抛异常。  onExecutionSucceeded(callback: CallbackFunction): void}

4、案例

此处以频繁读写系统文件来模拟I/O密集型并发任务的处理。

👉🏻 step 1:定义并发函数,内部密集调用I/O能力。​​​​​​​

// write.etsimport { fileIo } from '@kit.CoreFileKit'// 定义并发函数,内部密集调用I/O能力// 写入文件的实现export async function write(data: string, filePath: string): Promise<void> {  let file: fileIo.File = await fileIo.open(filePath, fileIo.OpenMode.READ_WRITE | fileIo.OpenMode.CREATE);  await fileIo.write(file.fd, data);  fileIo.close(file);}// ...// Index.etsimport { write } from './write'import { BusinessError } from '@kit.BasicServicesKit';import { taskpool } from '@kit.ArkTS';import { common } from '@kit.AbilityKit';@Concurrentasync function concurrentTest(context: common.UIAbilityContext): Promise<boolean> {  let filePath1: string = context.filesDir + "/path1.txt"; // 应用文件路径  let filePath2: string = context.filesDir + "/path2.txt";  // 循环写文件操作  let fileList: Array<string> = [];  fileList.push(filePath1);  fileList.push(filePath2)  for (let i: number = 0; i < fileList.length; i++) {    write('Hello World!', fileList[i]).then(() => {      console.info(`Succeeded in writing the file. FileList: ${fileList[i]}`);    }).catch((err: BusinessError) => {      console.error(`Failed to write the file. Code is ${err.code}, message is ${err.message}`)      return false;    })  }  return true;}

👉🏻 step 2:使用TaskPool执行包含密集I/O的并发函数。

通过调用execute()方法执行任务,并在回调中进行调度结果处理。 【在TaskPool中使用context需先在并发函数外部准备好,通过入参传递给并发函数才可使用】​​​​​​​

// Index.ets@Entry@Componentstruct Index {  @State message: string = 'Hello World';  build() {    Row() {      Column() {        Text(this.message)          .fontSize(50)          .fontWeight(FontWeight.Bold)          .onClick(() => {            let context = getContext() as common.UIAbilityContext;            // 使用TaskPool执行包含密集I/O的并发函数            // 数组较大时,I/O密集型任务任务分发也会抢占主线程,需要使用多线程能力            taskpool.execute(concurrentTest, context).then(() => {              // 调度结果处理              console.info("taskpool: execute success")            })          })      }      .width('100%')    }    .height('100%')  }}

http://www.mrgr.cn/news/67970.html

相关文章:

  • 【系统架构设计师(第2版)】七、系统架构设计基础知识
  • Java 代理模式详解
  • Scala的属性访问权限(一)默认访问权限
  • 【Keepalived】Keepalived 2.3.2版本编译报错
  • 安利一款超6K+ star的可拖放响应式灵活的网格布局Gridstack.js
  • 制作python的Dockerfile
  • Scala学习记录,List
  • 嵌入式linux中设备树控制硬件的方法
  • 【初阶数据结构与算法】沉浸式刷题之顺序表练习(顺序表以及双指针两种方法)
  • Serverless云计算服务
  • Java SPI机制简单讲解
  • Markdown 全面教程:从基础到高级
  • salesforce批量修改对象字段的四种方法
  • VScode建立Java项目
  • 一文带你深度了解FreeRTOS——递归互斥信号量
  • 2024年网鼎杯青龙组|MISC全解
  • Jest项目实战(5):发布代码到 npm
  • 矩阵论 •「线性空间、基变换与向量坐标变换」
  • Jest项目实战(4):将工具库顺利迁移到GitHub的完整指南
  • yakit中的fuzztag
  • Ubuntu安装Python并配置pip阿里镜像教程 - 幽络源
  • bat批量处理脚本细节研究
  • 什么是干部民主测评系统?如何选择合适的系统?
  • 论文 | Teaching Algorithmic Reasoning via In-context Learning
  • 基于STM32的智能花园灌溉系统设计
  • golang笔记-Array(数组)