1# TaskPool简介 2 3TaskPool为应用程序提供多线程环境,降低资源消耗、提高系统性能,无需管理线程生命周期。具体接口信息及使用方法详情请见[TaskPool](../reference/apis-arkts/js-apis-taskpool.md)。 4 5## TaskPool运作机制 6 7TaskPool运作机制示意图 8 9 10 11TaskPool支持开发者在宿主线程提交任务到任务队列,系统选择合适的工作线程执行任务,再将结果返回给宿主线程。接口易用,支持任务执行、取消和指定优先级,同时通过系统统一线程管理,结合动态调度及负载均衡算法,可以节约系统资源。系统默认启动一个任务工作线程,任务多时会扩容。工作线程数量上限取决于设备的物理核数,内部管理具体数量,确保调度和执行效率最优。长时间无任务分发时会缩容,减少工作线程数量。具体扩缩容机制详情请见[TaskPool扩缩容机制](taskpool-introduction.md#taskpool扩缩容机制)。 12 13## TaskPool注意事项 14 15- 实现任务的函数需要使用[@Concurrent装饰器](#concurrent装饰器)标注,且仅支持在.ets文件中使用。 16 17- 从API version 11开始,跨并发实例传递带方法的实例对象时,该类必须使用装饰器[@Sendable装饰器](arkts-sendable.md#sendable装饰器)标注,且仅支持在.ets文件中使用。 18 19- 任务函数在TaskPool工作线程的执行耗时不能超过3分钟(不包含Promise和async/await异步调用的耗时,例如网络下载、文件读写等I/O任务的耗时)。否则,任务将被强制终止。 20 21- 实现任务的函数入参需满足序列化支持的类型,详情请参见[线程间通信对象](interthread-communication-overview.md)。目前不支持使用[@State装饰器](../ui/state-management/arkts-state.md)、[@Prop装饰器](../ui/state-management/arkts-prop.md)、[@Link装饰器](../ui/state-management/arkts-link.md)等装饰器修饰的复杂类型。 22 23- ArrayBuffer参数在TaskPool中默认转移,需要设置转移列表的话可通过接口[setTransferList()](../reference/apis-arkts/js-apis-taskpool.md#settransferlist10)设置。如果需要多次调用使用ArrayBuffer作为参数的task,则需要通过接口[setCloneList()](../reference/apis-arkts/js-apis-taskpool.md#setclonelist11)把ArrayBuffer在线程中的传输行为改成拷贝传递,避免对原有对象产生影响。 24 25 ```ts 26 import { taskpool } from '@kit.ArkTS'; 27 import { BusinessError } from '@kit.BasicServicesKit'; 28 29 @Concurrent 30 function printArrayBuffer(buffer:ArrayBuffer) { 31 return buffer 32 } 33 34 function testArrayBuffer() { 35 let buffer = new ArrayBuffer(1); 36 let group = new taskpool.TaskGroup(); 37 let task = new taskpool.Task(printArrayBuffer, buffer); 38 group.addTask(task); 39 task.setCloneList([buffer]); 40 for (let i = 0; i < 5; i++) { 41 taskpool.execute(group).then(() => { 42 console.info("execute group success"); 43 }).catch((e: BusinessError) => { 44 console.error("execute group error: " + e.message); 45 }) 46 } 47 } 48 ``` 49 50- 由于不同线程中上下文对象是不同的,因此TaskPool工作线程只能使用线程安全的库,例如UI相关的非线程安全库不能使用。 51 52- 序列化传输的数据量限制为16MB。 53 54- [Priority](../reference/apis-arkts/js-apis-taskpool.md#priority)的IDLE优先级是用来标记需要在后台运行的耗时任务(例如数据同步、备份),它的优先级别是最低的。这种优先级的任务只在所有线程都空闲时触发执行,并且只会占用一个线程。 55 56- Promise不支持跨线程传递。TaskPool返回pending或rejected状态的Promise时失败,返回fulfilled状态的Promise时TaskPool会解析返回的结果,如果结果可以跨线程传递,则返回成功。 57 58- 不支持在TaskPool工作线程中使用[AppStorage](../ui/state-management/arkts-appstorage.md)。 59 60- TaskPool支持开发者在宿主线程封装任务并提交给任务队列,理论上支持的任务数量没有上限。然而,任务的执行效率受限于任务的优先级和系统资源。当工作线程达到最大数量时,任务的执行效率可能会下降。 61 62- TaskPool不支持指定任务所运行的线程,任务会被分配到空闲的线程中执行。如果需要指定任务所运行的线程,建议使用[Worker](worker-introduction.md)。 63 64## \@Concurrent装饰器 65 66在使用[TaskPool](../reference/apis-arkts/js-apis-taskpool.md)时,执行的并发函数需要使用该装饰器修饰,否则无法通过相关校验。 67 68> **说明:** 69> 70> 从API version 9开始,支持使用\@Concurrent装饰器声明并校验并发函数。 71 72### 装饰器说明 73 74| \@Concurrent并发装饰器 | 说明 | 75| -------- | -------- | 76| 装饰器参数 | 无。 | 77| 使用场景 | 仅支持在Stage模型的工程中使用。仅支持在.ets文件中使用。 | 78| 装饰的函数类型 | 允许标注async函数或普通函数。禁止标注generator、箭头函数、类方法。不支持类成员函数或者匿名函数。 | 79| 装饰的函数内的变量类型 | 允许使用local变量、入参和通过import引入的变量。禁止使用闭包变量。 | 80| 装饰的函数内的返回值类型 | 支持的类型请查[线程间通信对象](interthread-communication-overview.md)。 | 81 82> **说明:** 83> 84> 由于\@Concurrent标记的函数不能访问闭包,因此函数内部不能调用当前文件的其他函数,例如: 85> 86> ```ts 87> function bar() { 88> } 89> 90> @Concurrent 91> function foo() { 92> bar(); // 违反闭包原则,报错 93> } 94> ``` 95 96### 装饰器使用示例 97 98#### 并发函数一般使用 99 100并发函数为一个计算两数之和的普通函数,taskpool执行该函数并返回结果。 101 102示例: 103 104```ts 105import { taskpool } from '@kit.ArkTS'; 106 107@Concurrent 108function add(num1: number, num2: number): number { 109 return num1 + num2; 110} 111 112async function ConcurrentFunc(): Promise<void> { 113 try { 114 let task: taskpool.Task = new taskpool.Task(add, 1, 2); 115 console.info("taskpool res is: " + await taskpool.execute(task)); 116 } catch (e) { 117 console.error("taskpool execute error is: " + e); 118 } 119} 120 121@Entry 122@Component 123struct Index { 124 @State message: string = 'Hello World' 125 126 build() { 127 Row() { 128 Column() { 129 Text(this.message) 130 .fontSize(50) 131 .fontWeight(FontWeight.Bold) 132 .onClick(() => { 133 ConcurrentFunc(); 134 }) 135 } 136 .width('100%') 137 } 138 .height('100%') 139 } 140} 141``` 142 143#### 并发函数返回Promise 144 145并发函数中返回Promise时需要特别关注。如示例所示,testPromise和testPromise1等并发同步函数会处理该Promise并返回结果。 146 147示例: 148 149```ts 150import { taskpool } from '@kit.ArkTS'; 151 152@Concurrent 153function testPromise(args1: number, args2: number): Promise<number> { 154 return new Promise<number>((resolve, reject)=>{ 155 resolve(args1 + args2); 156 }); 157} 158 159@Concurrent 160async function testPromise1(args1: number, args2: number): Promise<number> { 161 return new Promise<number>((resolve, reject)=>{ 162 resolve(args1 + args2); 163 }); 164} 165 166@Concurrent 167async function testPromise2(args1: number, args2: number): Promise<number> { 168 return await new Promise<number>((resolve, reject)=>{ 169 resolve(args1 + args2); 170 }); 171} 172 173@Concurrent 174function testPromise3() { 175 return Promise.resolve(1); 176} 177 178@Concurrent 179async function testPromise4(): Promise<number> { 180 return 1; 181} 182 183@Concurrent 184async function testPromise5(): Promise<string> { 185 return await new Promise((resolve) => { 186 setTimeout(()=>{ 187 resolve("Promise setTimeout after resolve"); 188 }, 1000) 189 }); 190} 191 192async function testConcurrentFunc() { 193 let task1: taskpool.Task = new taskpool.Task(testPromise, 1, 2); 194 let task2: taskpool.Task = new taskpool.Task(testPromise1, 1, 2); 195 let task3: taskpool.Task = new taskpool.Task(testPromise2, 1, 2); 196 let task4: taskpool.Task = new taskpool.Task(testPromise3); 197 let task5: taskpool.Task = new taskpool.Task(testPromise4); 198 let task6: taskpool.Task = new taskpool.Task(testPromise5); 199 200 taskpool.execute(task1).then((d:object)=>{ 201 console.info("task1 res is: " + d); 202 }).catch((e:object)=>{ 203 console.info("task1 catch e: " + e); 204 }) 205 taskpool.execute(task2).then((d:object)=>{ 206 console.info("task2 res is: " + d); 207 }).catch((e:object)=>{ 208 console.info("task2 catch e: " + e); 209 }) 210 taskpool.execute(task3).then((d:object)=>{ 211 console.info("task3 res is: " + d); 212 }).catch((e:object)=>{ 213 console.info("task3 catch e: " + e); 214 }) 215 taskpool.execute(task4).then((d:object)=>{ 216 console.info("task4 res is: " + d); 217 }).catch((e:object)=>{ 218 console.info("task4 catch e: " + e); 219 }) 220 taskpool.execute(task5).then((d:object)=>{ 221 console.info("task5 res is: " + d); 222 }).catch((e:object)=>{ 223 console.info("task5 catch e: " + e); 224 }) 225 taskpool.execute(task6).then((d:object)=>{ 226 console.info("task6 res is: " + d); 227 }).catch((e:object)=>{ 228 console.info("task6 catch e: " + e); 229 }) 230} 231 232@Entry 233@Component 234struct Index { 235 @State message: string = 'Hello World'; 236 237 build() { 238 Row() { 239 Column() { 240 Button(this.message) 241 .fontSize(50) 242 .fontWeight(FontWeight.Bold) 243 .onClick(() => { 244 testConcurrentFunc(); 245 }) 246 } 247 .width('100%') 248 } 249 .height('100%') 250 } 251} 252``` 253 254#### 并发函数中使用自定义类或函数 255 256在并发函数中使用自定义类或函数时,需将其定义在不同的文件中,否则会被认为是闭包。如以下示例所示。 257 258示例: 259 260```ts 261// Index.ets 262import { taskpool } from '@kit.ArkTS'; 263import { BusinessError } from '@kit.BasicServicesKit'; 264import { testAdd, MyTestA, MyTestB } from './Test'; 265 266function add(arg: number) { 267 return ++arg; 268} 269 270class TestA { 271 constructor(name: string) { 272 this.name = name; 273 } 274 name: string = 'ClassA'; 275} 276 277class TestB { 278 static nameStr: string = 'ClassB'; 279} 280 281@Concurrent 282function TestFunc() { 283 // case1:在并发函数中直接调用同文件内定义的类或函数 284 285 // 直接调用同文件定义的函数add(),add飘红报错:Only imported variables and local variables can be used in @Concurrent decorated functions. <ArkTSCheck> 286 // add(1); 287 // 直接使用同文件定义的TestA构造,TestA飘红报错:Only imported variables and local variables can be used in @Concurrent decorated functions. <ArkTSCheck> 288 // let a = new TestA("aaa"); 289 // 直接访问同文件定义的TestB的成员nameStr,TestB飘红报错:Only imported variables and local variables can be used in @Concurrent decorated functions. <ArkTSCheck> 290 // console.info("TestB name is: " + TestB.nameStr); 291 292 // case2:在并发函数中调用定义在Test.ets文件并导入当前文件的类或函数 293 294 // 输出结果:res1 is: 2 295 console.info("res1 is: " + testAdd(1)); 296 let tmpStr = new MyTestA("TEST A"); 297 // 输出结果:res2 is: TEST A 298 console.info("res2 is: " + tmpStr.name); 299 // 输出结果:res3 is: MyTestB 300 console.info("res3 is: " + MyTestB.nameStr); 301} 302 303 304@Entry 305@Component 306struct Index { 307 @State message: string = 'Hello World'; 308 309 build() { 310 RelativeContainer() { 311 Text(this.message) 312 .id('HelloWorld') 313 .fontSize(50) 314 .fontWeight(FontWeight.Bold) 315 .alignRules({ 316 center: { anchor: '__container__', align: VerticalAlign.Center }, 317 middle: { anchor: '__container__', align: HorizontalAlign.Center } 318 }) 319 .onClick(() => { 320 let task = new taskpool.Task(TestFunc); 321 taskpool.execute(task).then(() => { 322 console.info("taskpool: execute task success!"); 323 }).catch((e:BusinessError) => { 324 console.error(`taskpool: execute: Code: ${e.code}, message: ${e.message}`); 325 }) 326 }) 327 } 328 .height('100%') 329 .width('100%') 330 } 331} 332``` 333 334```ts 335// Test.ets 336export function testAdd(arg: number) { 337 return ++arg; 338} 339 340@Sendable 341export class MyTestA { 342 constructor(name: string) { 343 this.name = name; 344 } 345 name: string = 'MyTestA'; 346} 347 348export class MyTestB { 349 static nameStr:string = 'MyTestB'; 350} 351``` 352 353#### 并发异步函数中使用Promise 354 355在并发异步函数中使用Promise时,建议搭配await使用。这样TaskPool可以捕获Promise中的异常。推荐使用示例如下。 356 357示例: 358 359```ts 360import { taskpool } from '@kit.ArkTS' 361 362@Concurrent 363async function testPromiseError() { 364 await new Promise<number>((resolve, reject) => { 365 resolve(1); 366 }).then(()=>{ 367 throw new Error("testPromise Error"); 368 }) 369} 370 371@Concurrent 372async function testPromiseError1() { 373 await new Promise<string>((resolve, reject) => { 374 reject("testPromiseError1 Error msg"); 375 }) 376} 377 378@Concurrent 379function testPromiseError2() { 380 return new Promise<string>((resolve, reject) => { 381 reject("testPromiseError2 Error msg"); 382 }) 383} 384 385async function testConcurrentFunc() { 386 let task1: taskpool.Task = new taskpool.Task(testPromiseError); 387 let task2: taskpool.Task = new taskpool.Task(testPromiseError1); 388 let task3: taskpool.Task = new taskpool.Task(testPromiseError2); 389 390 taskpool.execute(task1).then((d:object)=>{ 391 console.info("task1 res is: " + d); 392 }).catch((e:object)=>{ 393 console.info("task1 catch e: " + e); 394 }) 395 taskpool.execute(task2).then((d:object)=>{ 396 console.info("task2 res is: " + d); 397 }).catch((e:object)=>{ 398 console.info("task2 catch e: " + e); 399 }) 400 taskpool.execute(task3).then((d:object)=>{ 401 console.info("task3 res is: " + d); 402 }).catch((e:object)=>{ 403 console.info("task3 catch e: " + e); 404 }) 405} 406 407@Entry 408@Component 409struct Index { 410 @State message: string = 'Hello World'; 411 412 build() { 413 Row() { 414 Column() { 415 Button(this.message) 416 .fontSize(50) 417 .fontWeight(FontWeight.Bold) 418 .onClick(() => { 419 testConcurrentFunc(); 420 }) 421 } 422 .width('100%') 423 } 424 .height('100%') 425 } 426} 427``` 428 429## TaskPool扩缩容机制 430 431### 扩容机制 432 433一般情况下,开发者向任务队列提交任务时会触发扩容检测。扩容检测首先判断当前空闲的工作线程数是否大于任务数,如果大于,说明线程池中存在空闲工作线程,无需扩容。否则,通过负载计算确定所需工作线程数并创建。 434 435### 缩容机制 436 437扩容后,TaskPool新建多个工作线程,但当任务数减少后,这些线程就会处于空闲状态,造成资源浪费,因此TaskPool提供缩容机制。TaskPool使用了定时器,定时检测当前负载。定时器30s触发一次,每次尝试释放空闲的工作线程。释放的线程需要满足如下条件: 438 439- 该线程空闲时长达到30s。 440 441- 该线程上未执行长时任务([LongTask](../reference/apis-arkts/js-apis-taskpool.md#longtask12))。 442 443- 该线程上没有业务申请且未释放的句柄,例如[Timer(定时器)](../reference/common/js-apis-timer.md)。 444 445- 该线程处于非调试调优阶段。 446 447- 该线程中不存在已创建未销毁的子Worker。