鸿蒙多线程开发——TaskPool任务池
1、前 言
目前ArkTS提供了TaskPool和Worker两种并发能力,TaskPool和Worker都基于Actor并发模型实现(关于Actor并发模型和内存共享模型的区别,参考之前的文章 鸿蒙多线程开发——并发模型对比(Actor与内存共享))。
本文先针对TaskPool做介绍。
2、TaskPool
2.1、基本介绍
TaskPool 作用是为应用程序提供一个多线程的运行环境,降低整体资源的消耗、提高系统的整体性能,且您无需关心线程实例的生命周期。
有一定开发经验的朋友应该对线程池不陌生。大致描述如下:
TaskPool允许开发者在主线程封装任务抛给任务队列,系统选择合适的工作线程,进行任务的分发及执行,再将结果返回给主线程。接口直观易用,支持任务的执行、取消,以及指定优先级的能力,同时通过系统统一线程管理,结合动态调度及负载均衡算法,可以节约系统资源。系统默认会启动一个任务工作线程,当任务较多时会扩容,工作线程数量上限跟当前设备的物理核数相关,具体数量内部管理,保证最优的调度及执行效率,长时间没有任务分发时会缩容,减少工作线程数量。示意图如下:
2.2、注意事项
在使用TaskPool时,有一些需要注意的事项,如下:
-
实现任务的函数需要使用装饰器@Concurrent标注,且仅支持在.ets文件中使用。
-
从API version 11开始,实现任务的函数需要使用类方法时,该类必须使用装饰器@Sendable装饰器标注,且仅支持在.ets文件中使用。
-
任务函数在TaskPool工作线程的执行耗时不能超过3分钟(不包含Promise和async/await异步调用的耗时,例如网络下载、文件读写等I/O任务的耗时),否则会被强制退出。
-
实现任务的函数入参需满足序列化支持的类型。
-
ArrayBuffer参数在TaskPool中默认转移,需要设置转移列表的话可通过接口setTransferList()设置。
-
由于不同线程中上下文对象是不同的,因此TaskPool工作线程只能使用线程安全的库,例如UI相关的非线程安全库不能使用。
-
序列化传输的数据量大小限制为16MB。
-
Priority的IDLE优先级是用来标记需要在后台运行的耗时任务(例如数据同步、备份。),它的优先级别是最低的。这种优先级标记的任务只会在所有线程都空闲的情况下触发执行,并且只会占用一个线程来执行。
-
Promise不支持跨线程传递,不能作为concurrent function的返回值。
-
不支持在TaskPool工作线程中使用AppStorage。
正面案例:
// 正例
@Concurrent
async function asyncFunc(val1:number, val2:number): Promise<number> {
let ret: number = await new Promise((resolve, reject) => {
let value = val1 + val2;
resolve(value);
});
return ret; // 支持。直接返回Promise的结果。
}
function taskpoolExecute() {
taskpool.execute(asyncFunc, 10, 20).then((result: Object) => {
console.info("taskPoolTest task result: " + result);
}).catch((err: string) => {
console.error("taskPoolTest test occur error: " + err);
});
}
taskpoolExecute()
反面案例:
// 反例1:
@Concurrent
async function asyncFunc(val1:number, val2:number): Promise<number> {
let ret: number = await new Promise((resolve, reject) => {
let value = val1 + val2;
resolve(value);
});
return Promise.resolve(ret); // 不支持。Promise.resolve仍是Promise,其状态是pending,无法作为返回值使用。
}
// 反例2:
@Concurrent
async function asyncFunc(val1:number, val2:number): Promise<number> {
// 不支持。其状态是pending,无法作为返回值使用。
return new Promise((resolve, reject) => {
setTimeout(() => {
let value = val1 + val2;
resolve(value);
}, 2000);
});
}
function taskpoolExecute() {
taskpool.execute(asyncFunc, 10, 20).then((result: Object) => {
console.info("taskPoolTest task result: " + result);
}).catch((err: string) => {
console.error("taskPoolTest test occur error: " + err);
});
}
taskpoolExecute()
3、TaskPool接口介绍
-
模块导入
import { taskpool } from '@kit.ArkTS';
-
核心API
// 将待执行的函数放入taskpool内部任务队列 (函数不会立即执行,而是等待分发到工作线程执行。当前执行模式不可取消任务)
execute(func: Function, ...args: Object[]): Promise<Object>
// 将创建好的任务放入taskpool内部任务队列 (任务不会立即执行,而是等待分发到工作线程执行。当前执行模式可以设置任务优先级和尝试调用cancel进行任务取消。该任务不可以是任务组任务和串行)
execute(task: Task, priority?: Priority): Promise<Object>
// 延时执行任务。当前执行模式可以设置任务优先级和尝试调用cancel进行任务取消。该任务不可以是任务组任务、串行队列任务和周期任务。若该任务非长时任务,可以多次调用executeDelayed执行,长时任务仅支持执行一次。
executeDelayed(delayTime: number, task: Task, priority?: Priority): Promise<Object>
// 周期执行任务,每隔period时长执行一次任务。当前执行模式支持设置任务优先级和调用cancel取消任务周期执行。周期任务不可以是任务组任务和串行队列任务,不可以再次调用执行接口,不可以拥有依赖关系。
executePeriodically(period: number, task: Task, priority?: Priority): void
// 取消任务池中的任务。当任务在taskpool等待队列中,取消该任务后该任务将不再执行,并返回undefined作为结果;当任务已经在taskpool工作线程执行,取消该任务并不影响任务继续执行,执行结果在catch分支返回,搭配isCanceled使用可以对任务取消行为作出响应。taskpool.cancel对其之前的taskpool.execute/taskpool.executeDelayed生效。
cancel(task: Task): void
// 取消任务池中的任务组。当一个任务组的任务未全部执行结束时取消任务组,返回undefined作为任务组结果。
cancel(group: TaskGroup): void
// 中止任务池中的长时任务,在长时任务执行完成后调用。中止后,执行长时任务的线程可能会被回收。
terminateTask(longTask: LongTask): void
// 检查函数是否为并发函数。
isConcurrent(func: Function): boolean
// 获取任务池内部信息,包含线程信息和任务信息。
getTaskPoolInfo(): TaskPoolInfo
在核心API中,有几个关键的类或枚举:Priority、Task。定义如下:
enum Priority {
HIGH = 0, //任务为高优先级。
MEDIUM = 1, // 任务为中优先级。
LOW = 2, // 任务为低优先级。
IDLE = 3 //任务为后台任务。
}
class Task {
// 属性
function: Function // 创建任务时需要传入的函数,支持的函数返回值类型请查序列化支持类型。
arguments: Object[] // 创建任务传入函数所需的参数,支持的参数类型请查序列化支持类型。
name: string // 创建任务时指定的任务名称。
totalDuration: number // 执行任务总耗时。
ioDuration: number // 执行任务异步IO耗时。
cpuDuration: number // 执行任务CPU耗时。
// 方法
// 构造函数
constructor(func: Function, ...args: Object[])
constructor(name: string, func: Function, ...args: Object[])
static isCanceled(): boolean; // 任务是否已经取消
isDone(): boolean // 检查任务是否已完成。
/**
设置任务的传输列表。
此接口可以设置任务池中ArrayBuffer的transfer列表,transfer列表中的ArrayBuffer对象在传输时不会复制buffer内容到工作线程而是转移buffer控制权至工作线程,传输后当前的ArrayBuffer失效。若ArrayBuffer为空,则不会transfer转移。
*/
setTransferList(transfer?: ArrayBuffer[]): void
/**
设置任务的拷贝列表。
需搭配@Sendable装饰器使用,否则会抛异常。
*/
setCloneList(cloneList: Object[] | ArrayBuffer[]): void
// 在任务执行过程中向宿主线程发送消息并触发回调。使用该方法前需要先构造Task。
static sendData(...args: Object[]): void
// 为任务注册回调函数,以接收和处理来自任务池工作线程的数据。使用该方法前需要先构造Task。
onReceiveData(callback?: Function): void
// 为当前任务添加对其他任务的依赖。使用该方法前需要先构造Task。该任务和被依赖的任务不可以是任务组任务、串行队列任务、已执行的任务和周期任务。存在依赖关系的任务(依赖其他任务的任务或被依赖的任务)执行后不可以再次执行。
addDependency(...tasks: Task[]): void
// 删除当前任务对其他任务的依赖。使用该方法前需要先构造Task。
removeDependency(...tasks: Task[]): void
// 注册一个回调函数,并在任务入队时调用它。需在任务执行前注册,否则会抛异常。
onEnqueued(callback: CallbackFunction): void
// 注册一个回调函数,并在执行任务前调用它。需在任务执行前注册,否则会抛异常。
onStartExecution(callback: CallbackFunction): void
// 注册一个回调函数,并在任务执行失败时调用它。需在任务执行前注册,否则会抛异常。
onExecutionFailed(callback: CallbackFunctionWithError): void
// 注册一个回调函数,并在任务执行成功时调用它。需在任务执行前注册,否则会抛异常。
onExecutionSucceeded(callback: CallbackFunction): void
}
4、案例
此处以频繁读写系统文件来模拟I/O密集型并发任务的处理。
👉🏻 step 1:定义并发函数,内部密集调用I/O能力。
// write.ets
import { fileIo } from '@kit.CoreFileKit'
// 定义并发函数,内部密集调用I/O能力
// 写入文件的实现
export async function write(data: string, filePath: string): Promise<void> {
let file: fileIo.File = await fileIo.open(filePath, fileIo.OpenMode.READ_WRITE | fileIo.OpenMode.CREATE);
await fileIo.write(file.fd, data);
fileIo.close(file);
}
// ...
// Index.ets
import { write } from './write'
import { BusinessError } from '@kit.BasicServicesKit';
import { taskpool } from '@kit.ArkTS';
import { common } from '@kit.AbilityKit';
@Concurrent
async function concurrentTest(context: common.UIAbilityContext): Promise<boolean> {
let filePath1: string = context.filesDir + "/path1.txt"; // 应用文件路径
let filePath2: string = context.filesDir + "/path2.txt";
// 循环写文件操作
let fileList: Array<string> = [];
fileList.push(filePath1);
fileList.push(filePath2)
for (let i: number = 0; i < fileList.length; i++) {
write('Hello World!', fileList[i]).then(() => {
console.info(`Succeeded in writing the file. FileList: ${fileList[i]}`);
}).catch((err: BusinessError) => {
console.error(`Failed to write the file. Code is ${err.code}, message is ${err.message}`)
return false;
})
}
return true;
}
👉🏻 step 2:使用TaskPool执行包含密集I/O的并发函数。
通过调用execute()方法执行任务,并在回调中进行调度结果处理。 【在TaskPool中使用context需先在并发函数外部准备好,通过入参传递给并发函数才可使用】
// Index.ets
@Entry
@Component
struct Index {
@State message: string = 'Hello World';
build() {
Row() {
Column() {
Text(this.message)
.fontSize(50)
.fontWeight(FontWeight.Bold)
.onClick(() => {
let context = getContext() as common.UIAbilityContext;
// 使用TaskPool执行包含密集I/O的并发函数
// 数组较大时,I/O密集型任务任务分发也会抢占主线程,需要使用多线程能力
taskpool.execute(concurrentTest, context).then(() => {
// 调度结果处理
console.info("taskpool: execute success")
})
})
}
.width('100%')
}
.height('100%')
}
}