1# TaskPool简介 2<!--Kit: ArkTS--> 3<!--Subsystem: CommonLibrary--> 4<!--Owner: @wang_zhaoyong--> 5<!--Designer: @weng-changcheng--> 6<!--Tester: @kirl75; @zsw_zhushiwei--> 7<!--Adviser: @ge-yafang--> 8 9TaskPool为应用程序提供多线程环境,降低资源消耗并提高系统性能。无需管理线程生命周期。具体接口信息及使用方法,请参见[TaskPool](../reference/apis-arkts/js-apis-taskpool.md)。 10 11## TaskPool运作机制 12 13TaskPool运作机制示意图 14 15 16 17TaskPool支持在宿主线程提交任务到任务队列,系统选择合适的工作线程执行任务,并将结果返回给宿主线程。接口易用,支持任务执行、取消和指定优先级。通过系统统一线程管理,结合动态调度和负载均衡算法,可以节约系统资源。系统默认启动一个任务工作线程,任务多时会自动扩容。工作线程数量上限由设备的物理核数决定,内部管理具体数量,确保调度和执行效率最优。长时间无任务分发时会缩容,减少工作线程数量。具体扩缩容机制请参见[TaskPool扩缩容机制](taskpool-introduction.md#taskpool扩缩容机制)。 18 19## TaskPool注意事项 20 21- 实现任务的函数需要使用[@Concurrent装饰器](#concurrent装饰器)标注,且仅支持在.ets文件中使用。 22 23- 从API version 11开始,跨并发实例传递带方法的实例对象时,该类必须使用装饰器[@Sendable装饰器](arkts-sendable.md#sendable装饰器)标注,且仅支持在.ets文件中使用。 24 25- 任务函数([LongTask](../reference/apis-arkts/js-apis-taskpool.md#longtask12)除外)在TaskPool工作线程中的执行时长不能超过3分钟。否则,任务将被强制终止。需要注意的是,这里的3分钟限制仅统计TaskPool线程的同步运行时长,不包含异步操作(如Promise或async/await)的等待时长。例如,数据库的插入、删除、更新等操作,如果是异步操作,仅计入CPU实际处理时长(如SQL解析),网络传输或磁盘I/O等待时长不计入;如果是同步操作,整个操作时长(含I/O阻塞时间)均计入限制。开发者可通过[Task](../reference/apis-arkts/js-apis-taskpool.md#task)的属性ioDuration、cpuDuration获取执行当前任务的异步IO耗时和CPU耗时。 26 27- 实现任务的函数入参需满足序列化支持的类型。详情请参见[线程间通信对象](interthread-communication-overview.md)。目前不支持使用[@State装饰器](../ui/state-management/arkts-state.md)、[@Prop装饰器](../ui/state-management/arkts-prop.md)、[@Link装饰器](../ui/state-management/arkts-link.md)等装饰器修饰的复杂类型。 28 29- ArrayBuffer参数在TaskPool中默认转移,需要设置转移列表的话可通过接口[setTransferList()](../reference/apis-arkts/js-apis-taskpool.md#settransferlist10)设置。如果需要多次调用使用ArrayBuffer作为参数的task,则需要通过接口[setCloneList()](../reference/apis-arkts/js-apis-taskpool.md#setclonelist11)把ArrayBuffer在线程中的传输行为改成拷贝传递,避免对原有对象产生影响。 30 31除上述注意事项外,使用TaskPool时还需注意[并发注意事项](multi-thread-concurrency-overview.md#并发注意事项)。 32 33 ```ts 34 import { taskpool } from '@kit.ArkTS'; 35 import { BusinessError } from '@kit.BasicServicesKit'; 36 37 @Concurrent 38 function printArrayBuffer(buffer: ArrayBuffer) { 39 return buffer; 40 } 41 42 function testArrayBuffer() { 43 const buffer = new ArrayBuffer(1); 44 const group = new taskpool.TaskGroup(); 45 const task = new taskpool.Task(printArrayBuffer, buffer); 46 group.addTask(task); 47 task.setCloneList([buffer]); 48 for (let i = 0; i < 5; i++) { 49 taskpool.execute(group).then(() => { 50 console.info('execute group success'); 51 }).catch((e: BusinessError) => { 52 console.error(`execute group error: ${e.message}`); 53 }) 54 } 55 } 56 ``` 57 58- 由于不同线程中上下文对象不同,TaskPool工作线程只能使用线程安全的库。例如,不能使用UI相关的非线程安全库。 59 60- 序列化传输的数据量限制为16MB。 61 62- [Priority](../reference/apis-arkts/js-apis-taskpool.md#priority)的IDLE优先级是用来标记需要在后台运行的耗时任务(例如数据同步、备份),它的优先级别是最低的。这种优先级的任务只在所有线程都空闲时触发执行,并且同一时间只会有一个IDLE优先级的任务执行。 63 64- Promise不支持跨线程传递。TaskPool返回pending或rejected状态的Promise时会失败,返回fulfilled状态的Promise时TaskPool会解析返回的结果,如果结果可以跨线程传递,则返回成功。 65 66- 不支持在TaskPool工作线程中使用[AppStorage](../ui/state-management/arkts-appstorage.md)。 67 68- TaskPool支持在宿主线程封装任务并提交给任务队列,理论上支持的任务数量没有上限。然而,任务的执行效率受限于任务的优先级和系统资源。当工作线程达到最大数量时,任务的执行效率可能会下降 69 70- TaskPool不支持指定任务所运行的线程,任务会被分配到空闲的线程中执行。如果需要指定任务所运行的线程,建议使用[Worker](worker-introduction.md)。 71 72## \@Concurrent装饰器 73 74使用[TaskPool](../reference/apis-arkts/js-apis-taskpool.md)时,执行的并发函数必须用该装饰器修饰,否则无法通过校验。 75 76> **说明:** 77> 78> 从API version 9开始,支持使用\@Concurrent装饰器声明并校验并发函数。 79 80| \@Concurrent并发装饰器 | 说明 | 81| -------- | -------- | 82| 装饰器参数 | 无。 | 83| 使用场景 | 仅支持在Stage模型的工程中使用。仅支持在.ets文件中使用。 | 84| 装饰的函数类型 | 允许标注为async函数或普通函数。禁止标注为generator、箭头函数、类方法。不支持类成员函数或者匿名函数。 | 85| 装饰的函数内的变量类型 | 允许使用局部变量、入参和通过import引入的变量,禁止使用闭包变量。 | 86| 装饰的函数内的返回值类型 | 支持的类型请查[线程间通信对象](interthread-communication-overview.md)。 | 87 88> **说明:** 89> 90> 由于\@Concurrent标记的函数不能访问闭包,因此函数内部不能调用当前文件的其他函数,例如: 91> 92> ```ts 93> function bar() { 94> } 95> 96> @Concurrent 97> function foo() { 98> bar(); // 违反闭包原则,报错 99> } 100> ``` 101 102## 装饰器使用示例 103 104### 并发函数一般使用 105 106并发函数为一个计算两数之和的普通函数,taskpool执行该函数并返回结果。 107 108示例: 109 110```ts 111import { taskpool } from '@kit.ArkTS'; 112 113@Concurrent 114function add(num1: number, num2: number): number { 115 return num1 + num2; 116} 117 118async function concurrentFunc(): Promise<void> { 119 try { 120 const task: taskpool.Task = new taskpool.Task(add, 1, 2); 121 console.info(`taskpool res is: ${await taskpool.execute(task)}`); // 输出结果:taskpool res is: 3 122 } catch (e) { 123 console.error(`taskpool execute error is: ${e}`); 124 } 125} 126 127@Entry 128@Component 129struct Index { 130 @State message: string = 'Hello World'; 131 132 build() { 133 Row() { 134 Column() { 135 Text(this.message) 136 .fontSize(50) 137 .fontWeight(FontWeight.Bold) 138 .onClick(() => { 139 concurrentFunc(); 140 }) 141 } 142 .width('100%') 143 } 144 .height('100%') 145 } 146} 147``` 148<!-- @[concurrent_taskpool_common_usage](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/MultithreadedConcurrency/TaskPoolIntroduction/entry/src/main/ets/managers/generaluse.ets) --> 149 150### 并发函数返回Promise 151 152在并发函数中返回Promise时需特别注意。如示例所示,testPromise和testPromise1等函数需处理Promise并返回结果。 153 154示例: 155 156```ts 157import { taskpool } from '@kit.ArkTS'; 158 159@Concurrent 160function testPromise(args1: number, args2: number): Promise<number> { 161 return new Promise<number>((resolve, reject) => { 162 resolve(args1 + args2); 163 }); 164} 165 166@Concurrent 167async function testPromise1(args1: number, args2: number): Promise<number> { 168 return new Promise<number>((resolve, reject) => { 169 resolve(args1 + args2); 170 }); 171} 172 173@Concurrent 174async function testPromise2(args1: number, args2: number): Promise<number> { 175 return await new Promise<number>((resolve, reject) => { 176 resolve(args1 + args2); 177 }); 178} 179 180@Concurrent 181function testPromise3() { 182 return Promise.resolve(1); 183} 184 185@Concurrent 186async function testPromise4(): Promise<number> { 187 return 1; 188} 189 190@Concurrent 191async function testPromise5(): Promise<string> { 192 return await new Promise((resolve) => { 193 setTimeout(() => { 194 resolve('Promise setTimeout after resolve'); 195 }, 1000) 196 }); 197} 198 199async function testConcurrentFunc() { 200 const task1: taskpool.Task = new taskpool.Task(testPromise, 1, 2); 201 const task2: taskpool.Task = new taskpool.Task(testPromise1, 1, 2); 202 const task3: taskpool.Task = new taskpool.Task(testPromise2, 1, 2); 203 const task4: taskpool.Task = new taskpool.Task(testPromise3); 204 const task5: taskpool.Task = new taskpool.Task(testPromise4); 205 const task6: taskpool.Task = new taskpool.Task(testPromise5); 206 207 taskpool.execute(task1).then((d: object) => { 208 console.info(`task1 res is: ${d}`); // 输出结果:task1 res is: 3 209 }).catch((e: object) => { 210 console.error(`task1 catch e: ${e}`); 211 }) 212 taskpool.execute(task2).then((d: object) => { 213 console.info(`task2 res is: ${d}`); 214 }).catch((e: object) => { 215 console.error(`task2 catch e: ${e}`); // 输出结果:task2 catch e: Error: Can't return Promise in pending state 216 }) 217 taskpool.execute(task3).then((d: object) => { 218 console.info(`task3 res is: ${d}`); // 输出结果:task3 res is: 3 219 }).catch((e: object) => { 220 console.error(`task3 catch e: ${e}`); 221 }) 222 taskpool.execute(task4).then((d: object) => { 223 console.info(`task4 res is: ${d}`); // 输出结果:task4 res is: 1 224 }).catch((e: object) => { 225 console.error(`task4 catch e: ${e}`); 226 }) 227 taskpool.execute(task5).then((d: object) => { 228 console.info(`task5 res is: ${d}`); // 输出结果:task5 res is: 1 229 }).catch((e: object) => { 230 console.error(`task5 catch e: ${e}`); 231 }) 232 taskpool.execute(task6).then((d: object) => { 233 console.info(`task6 res is: ${d}`); // 输出结果:task6 res is: Promise setTimeout after resolve 234 }).catch((e: object) => { 235 console.error(`task6 catch e: ${e}`); 236 }) 237} 238 239@Entry 240@Component 241struct Index { 242 @State message: string = 'Hello World'; 243 244 build() { 245 Row() { 246 Column() { 247 Button(this.message) 248 .fontSize(50) 249 .fontWeight(FontWeight.Bold) 250 .onClick(() => { 251 testConcurrentFunc(); 252 }) 253 } 254 .width('100%') 255 } 256 .height('100%') 257 } 258} 259``` 260<!-- @[concurrent_taskpool_promise_return](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/MultithreadedConcurrency/TaskPoolIntroduction/entry/src/main/ets/managers/returnpromise.ets) --> 261 262### 并发函数中使用自定义类或函数 263 264在并发函数中使用自定义类或函数时,需将其定义在单独的文件中,否则可能被视为闭包。如下示例所示。 265 266示例: 267 268```ts 269// Index.ets 270import { taskpool } from '@kit.ArkTS'; 271import { BusinessError } from '@kit.BasicServicesKit'; 272import { testAdd, MyTestA, MyTestB } from './Test'; 273 274function add(arg: number) { 275 return ++arg; 276} 277 278class TestA { 279 constructor(name: string) { 280 this.name = name; 281 } 282 283 name: string = 'ClassA'; 284} 285 286class TestB { 287 static nameStr: string = 'ClassB'; 288} 289 290@Concurrent 291function TestFunc() { 292 // case1:在并发函数中直接调用同文件内定义的类或函数 293 294 // 直接调用同文件定义的函数add(),add飘红报错:Only imported variables and local variables can be used in @Concurrent decorated functions. <ArkTSCheck> 295 // add(1); 296 // 直接使用同文件定义的TestA构造,TestA飘红报错:Only imported variables and local variables can be used in @Concurrent decorated functions. <ArkTSCheck> 297 // const a = new TestA('aaa'); 298 // 直接访问同文件定义的TestB的成员nameStr,TestB飘红报错:Only imported variables and local variables can be used in @Concurrent decorated functions. <ArkTSCheck> 299 // console.info(`TestB name is: ${TestB.nameStr}`); 300 301 // case2:在并发函数中调用定义在Test.ets文件并导入当前文件的类或函数 302 303 // 输出结果:res1 is: 2 304 console.info(`res1 is: ${testAdd(1)}`); 305 const tmpStr = new MyTestA('TEST A'); 306 // 输出结果:res2 is: TEST A 307 console.info(`res2 is: ${tmpStr.name}`); 308 // 输出结果:res3 is: MyTestB 309 console.info(`res3 is: ${MyTestB.nameStr}`); 310} 311 312 313@Entry 314@Component 315struct Index { 316 @State message: string = 'Hello World'; 317 318 build() { 319 RelativeContainer() { 320 Text(this.message) 321 .id('HelloWorld') 322 .fontSize(50) 323 .fontWeight(FontWeight.Bold) 324 .alignRules({ 325 center: { anchor: '__container__', align: VerticalAlign.Center }, 326 middle: { anchor: '__container__', align: HorizontalAlign.Center } 327 }) 328 .onClick(() => { 329 const task = new taskpool.Task(TestFunc); 330 taskpool.execute(task).then(() => { 331 console.info('taskpool: execute task success!'); 332 }).catch((e: BusinessError) => { 333 console.error(`taskpool: execute: Code: ${e.code}, message: ${e.message}`); 334 }) 335 }) 336 } 337 .height('100%') 338 .width('100%') 339 } 340} 341``` 342<!-- @[concurrent_taskpool_custom_class_function](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/MultithreadedConcurrency/TaskPoolIntroduction/entry/src/main/ets/managers/customclasses.ets) --> 343 344```ts 345// Test.ets 346export function testAdd(arg: number) { 347 return ++arg; 348} 349 350@Sendable 351export class MyTestA { 352 constructor(name: string) { 353 this.name = name; 354 } 355 name: string = 'MyTestA'; 356} 357 358export class MyTestB { 359 static nameStr:string = 'MyTestB'; 360} 361``` 362<!-- @[concurrent_taskpool_test_resources](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/MultithreadedConcurrency/TaskPoolIntroduction/entry/src/main/ets/managers/Test.ets) --> 363 364### 并发异步函数中使用Promise 365 366在并发异步函数中使用Promise时,建议搭配await使用,这样TaskPool可以捕获Promise中的异常。推荐使用示例如下。 367 368示例: 369 370```ts 371import { taskpool } from '@kit.ArkTS'; 372 373@Concurrent 374async function testPromiseError() { 375 await new Promise<number>((resolve, reject) => { 376 resolve(1); 377 }).then(() => { 378 throw new Error('testPromise error'); 379 }) 380} 381 382@Concurrent 383async function testPromiseError1() { 384 await new Promise<string>((resolve, reject) => { 385 reject('testPromiseError1 error msg'); 386 }) 387} 388 389@Concurrent 390function testPromiseError2() { 391 return new Promise<string>((resolve, reject) => { 392 reject('testPromiseError2 error msg'); 393 }) 394} 395 396async function testConcurrentFunc() { 397 const task1: taskpool.Task = new taskpool.Task(testPromiseError); 398 const task2: taskpool.Task = new taskpool.Task(testPromiseError1); 399 const task3: taskpool.Task = new taskpool.Task(testPromiseError2); 400 401 taskpool.execute(task1).then((d: object) => { 402 console.info(`task1 res is: ${d}`); 403 }).catch((e: object) => { 404 console.error(`task1 catch e: ${e}`); // task1 catch e: Error: testPromise error 405 }) 406 taskpool.execute(task2).then((d: object) => { 407 console.info(`task2 res is: ${d}`); 408 }).catch((e: object) => { 409 console.error(`task2 catch e: ${e}`); // task2 catch e: testPromiseError1 error msg 410 }) 411 taskpool.execute(task3).then((d: object) => { 412 console.info(`task3 res is: ${d}`); 413 }).catch((e: object) => { 414 console.error(`task3 catch e: ${e}`); // task3 catch e: testPromiseError2 error msg 415 }) 416} 417 418@Entry 419@Component 420struct Index { 421 @State message: string = 'Hello World'; 422 423 build() { 424 Row() { 425 Column() { 426 Button(this.message) 427 .fontSize(50) 428 .fontWeight(FontWeight.Bold) 429 .onClick(() => { 430 testConcurrentFunc(); 431 }) 432 } 433 .width('100%') 434 } 435 .height('100%') 436 } 437} 438``` 439<!-- @[concurrent_taskpool_async_promise_usage](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/MultithreadedConcurrency/TaskPoolIntroduction/entry/src/main/ets/managers/asynchronousfunctions.ets) --> 440 441## TaskPool扩缩容机制 442 443### 扩容机制 444 445一般情况下,向任务队列提交任务时会触发扩容检测。扩容检测首先判断当前空闲工作线程数是否大于任务数。如果大于,说明线程池中有空闲工作线程,无需扩容。否则,通过负载计算确定所需工作线程数并创建。 446 447### 缩容机制 448 449扩容后,TaskPool创建多个工作线程,但当任务数减少后,这些线程就会处于空闲状态,造成资源浪费,因此,TaskPool提供了缩容机制。TaskPool使用定时器,每30秒检测一次当前负载,并尝试释放空闲的工作线程。释放的线程需满足以下条件: 450 451- 该线程空闲时长达到30s。 452 453- 该线程上未执行长时任务([LongTask](../reference/apis-arkts/js-apis-taskpool.md#longtask12))。 454 455- 该线程上没有业务申请且未释放的句柄,例如[Timer(定时器)](../reference/common/js-apis-timer.md)。 456 457- 该线程处于非调试调优阶段。 458 459- 该线程中不存在已创建未销毁的子Worker。