鸿蒙多线程开发——TaskPool任务池

1、前 言
目前ArkTS提供了TaskPool和Worker两种并发能力,TaskPool和Worker都基于Actor并发模型实现(关于Actor并发模型和内存共享模型的区别,参考之前的文章 鸿蒙多线程开发——并发模型对比(Actor与内存共享))。
本文先针对TaskPool做介绍。
2、TaskPool
2.1、基本介绍
TaskPool 作用是为应用程序提供一个多线程的运行环境,降低整体资源的消耗、提高系统的整体性能,且您无需关心线程实例的生命周期。
有一定开发经验的朋友应该对线程池不陌生。大致描述如下:
TaskPool允许开发者在主线程封装任务抛给任务队列,系统选择合适的工作线程,进行任务的分发及执行,再将结果返回给主线程。接口直观易用,支持任务的执行、取消,以及指定优先级的能力,同时通过系统统一线程管理,结合动态调度及负载均衡算法,可以节约系统资源。系统默认会启动一个任务工作线程,当任务较多时会扩容,工作线程数量上限跟当前设备的物理核数相关,具体数量内部管理,保证最优的调度及执行效率,长时间没有任务分发时会缩容,减少工作线程数量。示意图如下:

2.2、注意事项
在使用TaskPool时,有一些需要注意的事项,如下:
-
实现任务的函数需要使用装饰器@Concurrent标注,且仅支持在.ets文件中使用。
-
从API version 11开始,实现任务的函数需要使用类方法时,该类必须使用装饰器@Sendable装饰器标注,且仅支持在.ets文件中使用。
-
任务函数在TaskPool工作线程的执行耗时不能超过3分钟(不包含Promise和async/await异步调用的耗时,例如网络下载、文件读写等I/O任务的耗时),否则会被强制退出。
-
实现任务的函数入参需满足序列化支持的类型。
-
ArrayBuffer参数在TaskPool中默认转移,需要设置转移列表的话可通过接口setTransferList()设置。
-
由于不同线程中上下文对象是不同的,因此TaskPool工作线程只能使用线程安全的库,例如UI相关的非线程安全库不能使用。
-
序列化传输的数据量大小限制为16MB。
-
Priority的IDLE优先级是用来标记需要在后台运行的耗时任务(例如数据同步、备份。),它的优先级别是最低的。这种优先级标记的任务只会在所有线程都空闲的情况下触发执行,并且只会占用一个线程来执行。
-
Promise不支持跨线程传递,不能作为concurrent function的返回值。
-
不支持在TaskPool工作线程中使用AppStorage。
正面案例:
// 正例@Concurrentasync function asyncFunc(val1:number, val2:number): Promise<number> {let ret: number = await new Promise((resolve, reject) => {let value = val1 + val2;resolve(value);});return ret; // 支持。直接返回Promise的结果。}function taskpoolExecute() {taskpool.execute(asyncFunc, 10, 20).then((result: Object) => {console.info("taskPoolTest task result: " + result);}).catch((err: string) => {console.error("taskPoolTest test occur error: " + err);});}taskpoolExecute()
反面案例:
// 反例1:@Concurrentasync function asyncFunc(val1:number, val2:number): Promise<number> {let ret: number = await new Promise((resolve, reject) => {let value = val1 + val2;resolve(value);});return Promise.resolve(ret); // 不支持。Promise.resolve仍是Promise,其状态是pending,无法作为返回值使用。}// 反例2:@Concurrentasync function asyncFunc(val1:number, val2:number): Promise<number> {// 不支持。其状态是pending,无法作为返回值使用。return new Promise((resolve, reject) => {setTimeout(() => {let value = val1 + val2;resolve(value);}, 2000);});}function taskpoolExecute() {taskpool.execute(asyncFunc, 10, 20).then((result: Object) => {console.info("taskPoolTest task result: " + result);}).catch((err: string) => {console.error("taskPoolTest test occur error: " + err);});}taskpoolExecute()
3、TaskPool接口介绍
-
模块导入
import { taskpool } from '@kit.ArkTS';
-
核心API
// 将待执行的函数放入taskpool内部任务队列 (函数不会立即执行,而是等待分发到工作线程执行。当前执行模式不可取消任务)execute(func: Function, ...args: Object[]): Promise<Object>// 将创建好的任务放入taskpool内部任务队列 (任务不会立即执行,而是等待分发到工作线程执行。当前执行模式可以设置任务优先级和尝试调用cancel进行任务取消。该任务不可以是任务组任务和串行)execute(task: Task, priority?: Priority): Promise<Object>// 延时执行任务。当前执行模式可以设置任务优先级和尝试调用cancel进行任务取消。该任务不可以是任务组任务、串行队列任务和周期任务。若该任务非长时任务,可以多次调用executeDelayed执行,长时任务仅支持执行一次。executeDelayed(delayTime: number, task: Task, priority?: Priority): Promise<Object>// 周期执行任务,每隔period时长执行一次任务。当前执行模式支持设置任务优先级和调用cancel取消任务周期执行。周期任务不可以是任务组任务和串行队列任务,不可以再次调用执行接口,不可以拥有依赖关系。executePeriodically(period: number, task: Task, priority?: Priority): void// 取消任务池中的任务。当任务在taskpool等待队列中,取消该任务后该任务将不再执行,并返回undefined作为结果;当任务已经在taskpool工作线程执行,取消该任务并不影响任务继续执行,执行结果在catch分支返回,搭配isCanceled使用可以对任务取消行为作出响应。taskpool.cancel对其之前的taskpool.execute/taskpool.executeDelayed生效。cancel(task: Task): void// 取消任务池中的任务组。当一个任务组的任务未全部执行结束时取消任务组,返回undefined作为任务组结果。cancel(group: TaskGroup): void// 中止任务池中的长时任务,在长时任务执行完成后调用。中止后,执行长时任务的线程可能会被回收。terminateTask(longTask: LongTask): void// 检查函数是否为并发函数。isConcurrent(func: Function): boolean// 获取任务池内部信息,包含线程信息和任务信息。getTaskPoolInfo(): TaskPoolInfo
在核心API中,有几个关键的类或枚举:Priority、Task。定义如下:
enum Priority {HIGH = 0, //任务为高优先级。MEDIUM = 1, // 任务为中优先级。LOW = 2, // 任务为低优先级。IDLE = 3 //任务为后台任务。}class Task {// 属性function: Function // 创建任务时需要传入的函数,支持的函数返回值类型请查序列化支持类型。arguments: Object[] // 创建任务传入函数所需的参数,支持的参数类型请查序列化支持类型。name: string // 创建任务时指定的任务名称。totalDuration: number // 执行任务总耗时。ioDuration: number // 执行任务异步IO耗时。cpuDuration: number // 执行任务CPU耗时。// 方法// 构造函数constructor(func: Function, ...args: Object[])constructor(name: string, func: Function, ...args: Object[])static isCanceled(): boolean; // 任务是否已经取消isDone(): boolean // 检查任务是否已完成。/**设置任务的传输列表。此接口可以设置任务池中ArrayBuffer的transfer列表,transfer列表中的ArrayBuffer对象在传输时不会复制buffer内容到工作线程而是转移buffer控制权至工作线程,传输后当前的ArrayBuffer失效。若ArrayBuffer为空,则不会transfer转移。*/setTransferList(transfer?: ArrayBuffer[]): void/**设置任务的拷贝列表。需搭配@Sendable装饰器使用,否则会抛异常。*/setCloneList(cloneList: Object[] | ArrayBuffer[]): void// 在任务执行过程中向宿主线程发送消息并触发回调。使用该方法前需要先构造Task。static sendData(...args: Object[]): void// 为任务注册回调函数,以接收和处理来自任务池工作线程的数据。使用该方法前需要先构造Task。onReceiveData(callback?: Function): void// 为当前任务添加对其他任务的依赖。使用该方法前需要先构造Task。该任务和被依赖的任务不可以是任务组任务、串行队列任务、已执行的任务和周期任务。存在依赖关系的任务(依赖其他任务的任务或被依赖的任务)执行后不可以再次执行。addDependency(...tasks: Task[]): void// 删除当前任务对其他任务的依赖。使用该方法前需要先构造Task。removeDependency(...tasks: Task[]): void// 注册一个回调函数,并在任务入队时调用它。需在任务执行前注册,否则会抛异常。onEnqueued(callback: CallbackFunction): void// 注册一个回调函数,并在执行任务前调用它。需在任务执行前注册,否则会抛异常。onStartExecution(callback: CallbackFunction): void// 注册一个回调函数,并在任务执行失败时调用它。需在任务执行前注册,否则会抛异常。onExecutionFailed(callback: CallbackFunctionWithError): void// 注册一个回调函数,并在任务执行成功时调用它。需在任务执行前注册,否则会抛异常。onExecutionSucceeded(callback: CallbackFunction): void}
4、案例
此处以频繁读写系统文件来模拟I/O密集型并发任务的处理。
👉🏻 step 1:定义并发函数,内部密集调用I/O能力。
// write.etsimport { fileIo } from '@kit.CoreFileKit'// 定义并发函数,内部密集调用I/O能力// 写入文件的实现export async function write(data: string, filePath: string): Promise<void> {let file: fileIo.File = await fileIo.open(filePath, fileIo.OpenMode.READ_WRITE | fileIo.OpenMode.CREATE);await fileIo.write(file.fd, data);fileIo.close(file);}// ...// Index.etsimport { write } from './write'import { BusinessError } from '@kit.BasicServicesKit';import { taskpool } from '@kit.ArkTS';import { common } from '@kit.AbilityKit';@Concurrentasync function concurrentTest(context: common.UIAbilityContext): Promise<boolean> {let filePath1: string = context.filesDir + "/path1.txt"; // 应用文件路径let filePath2: string = context.filesDir + "/path2.txt";// 循环写文件操作let fileList: Array<string> = [];fileList.push(filePath1);fileList.push(filePath2)for (let i: number = 0; i < fileList.length; i++) {write('Hello World!', fileList[i]).then(() => {console.info(`Succeeded in writing the file. FileList: ${fileList[i]}`);}).catch((err: BusinessError) => {console.error(`Failed to write the file. Code is ${err.code}, message is ${err.message}`)return false;})}return true;}
👉🏻 step 2:使用TaskPool执行包含密集I/O的并发函数。
通过调用execute()方法执行任务,并在回调中进行调度结果处理。 【在TaskPool中使用context需先在并发函数外部准备好,通过入参传递给并发函数才可使用】
// Index.ets@Entry@Componentstruct Index {@State message: string = 'Hello World';build() {Row() {Column() {Text(this.message).fontSize(50).fontWeight(FontWeight.Bold).onClick(() => {let context = getContext() as common.UIAbilityContext;// 使用TaskPool执行包含密集I/O的并发函数// 数组较大时,I/O密集型任务任务分发也会抢占主线程,需要使用多线程能力taskpool.execute(concurrentTest, context).then(() => {// 调度结果处理console.info("taskpool: execute success")})})}.width('100%')}.height('100%')}}
