1# TaskPool简介 2 3任务池(TaskPool)作用是为应用程序提供一个多线程的运行环境,降低整体资源的消耗、提高系统的整体性能,且您无需关心线程实例的生命周期。具体接口信息及使用方法详情请见TaskPool。 4 5## TaskPool运作机制 6 7 8TaskPool支持开发者在宿主线程封装任务抛给任务队列,系统选择合适的工作线程,进行任务的分发及执行,再将结果返回给宿主线程。接口直观易用,支持任务的执行、取消,以及指定优先级的能力,同时通过系统统一线程管理,结合动态调度及负载均衡算法,可以节约系统资源。系统默认会启动一个任务工作线程,当任务较多时会扩容,工作线程数量上限跟当前设备的物理核数相关,具体数量内部管理,保证最优的调度及执行效率,长时间没有任务分发时会缩容,减少工作线程数量。 9 10## TaskPool注意事项 11 12- 实现任务的函数需要使用[@Concurrent装饰器](#concurrent装饰器)标注,且仅支持在.ets文件中使用。 13 14- 从API version 11开始,跨并发实例传递带方法的实例对象时,该类必须使用装饰器@Sendable装饰器标注,且仅支持在.ets文件中使用。 15 16- 任务函数在TaskPool工作线程的执行耗时不能超过3分钟(不包含Promise和async/await异步调用的耗时,例如网络下载、文件读写等I/O任务的耗时),否则会被强制退出。 17 18- 实现任务的函数入参需满足序列化支持的类型,详情请参见线程间通信对象。 19 20- ArrayBuffer参数在TaskPool中默认转移,需要设置转移列表的话可通过接口setTransferList()设置。 21 22- 由于不同线程中上下文对象是不同的,因此TaskPool工作线程只能使用线程安全的库,例如UI相关的非线程安全库不能使用。 23 24- 序列化传输的数据量大小限制为16MB。 25 26-Priority的IDLE优先级是用来标记需要在后台运行的耗时任务(例如数据同步、备份),它的优先级别是最低的。这种优先级标记的任务只会在所有线程都空闲的情况下触发执行,并且只会占用一个线程来执行。 27 28- Promise不支持跨线程传递,如果TaskPool返回pending或rejected状态的Promise,会返回失败;对于fulfilled状态的Promise,TaskPool会解析返回的结果,如果结果可以跨线程传递,则返回成功。 29 30- 不支持在TaskPool工作线程中使用AppStorage。 31 32- TaskPool支持开发者在宿主线程封装任务抛给任务队列,理论上可以支持任意多的任务,但任务的执行受限于任务的优先级以及系统资源的影响,在工作线程扩容到最大后,可能会导致任务的执行效率下降。 33 34- TaskPool不支持指定任务所运行的线程,任务会被分配到空闲的线程中执行。如果需要指定任务运行的线程,建议使用Worker来实现。 35 36## \@Concurrent装饰器 37 38在使用TaskPool时,执行的并发函数需要使用该装饰器修饰,否则无法通过相关校验。 39 40> **说明:** 41> 42> 从API version 9开始,支持使用\@Concurrent装饰器声明并校验并发函数。 43 44### 装饰器说明 45 46| \@Concurrent并发装饰器 | 说明 | 47| -------- | -------- | 48| 装饰器参数 | 无。 | 49| 使用场景 | 仅支持在Stage模型的工程中使用。仅支持在.ets文件中使用。 | 50| 装饰的函数类型 | 允许标注async函数或普通函数。禁止标注generator、箭头函数、类方法。不支持类成员函数或者匿名函数。 | 51| 装饰的函数内的变量类型 | 允许使用local变量、入参和通过import引入的变量。禁止使用闭包变量。 | 52| 装饰的函数内的返回值类型 | 支持的类型请查线程间通信对象。 | 53 54> **说明:** 55> 56> 由于\@Concurrent标记的函数不能访问闭包,因此\@Concurrent标记的函数内部不能调用当前文件的其他函数,例如: 57> 58> ```ts 59> function bar() { 60> } 61> 62> @Concurrent 63> function foo() { 64> bar(); // 违反闭包原则,报错 65> } 66> ``` 67 68### 装饰器使用示例 69 70#### 并发函数一般使用 71 72并发函数为一个计算两数之和的普通函数,taskpool执行该函数并返回结果。 73 74示例: 75 76```ts 77import { taskpool } from '@kit.ArkTS'; 78 79@Concurrent 80function add(num1: number, num2: number): number { 81 return num1 + num2; 82} 83 84async function ConcurrentFunc(): Promise<void> { 85 try { 86 let task: taskpool.Task = new taskpool.Task(add, 1, 2); 87 console.info("taskpool res is: " + await taskpool.execute(task)); 88 } catch (e) { 89 console.error("taskpool execute error is: " + e); 90 } 91} 92 93@Entry 94@Component 95struct Index { 96 @State message: string = 'Hello World' 97 98 build() { 99 Row() { 100 Column() { 101 Text(this.message) 102 .fontSize(50) 103 .fontWeight(FontWeight.Bold) 104 .onClick(() => { 105 ConcurrentFunc(); 106 }) 107 } 108 .width('100%') 109 } 110 .height('100%') 111 } 112} 113``` 114<!--@[concurrent_taskpool_common_usage](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/MultithreadedConcurrency/TaskPoolIntroduction/entry/src/main/ets/managers/generaluse.ets)--> 115 116#### 并发函数返回Promise 117 118并发函数中返回Promise的表现需关注,如下例所示,其中testPromise、testPromise1等并发同步函数会处理该Promise并返回结果。 119 120示例: 121 122```ts 123import { taskpool } from '@kit.ArkTS'; 124 125@Concurrent 126function testPromise(args1: number, args2: number): Promise<number> { 127 return new Promise<number>((resolve, reject)=>{ 128 resolve(args1 + args2); 129 }); 130} 131 132@Concurrent 133async function testPromise1(args1: number, args2: number): Promise<number> { 134 return new Promise<number>((resolve, reject)=>{ 135 resolve(args1 + args2); 136 }); 137} 138 139@Concurrent 140async function testPromise2(args1: number, args2: number): Promise<number> { 141 return await new Promise<number>((resolve, reject)=>{ 142 resolve(args1 + args2); 143 }); 144} 145 146@Concurrent 147function testPromise3() { 148 return Promise.resolve(1); 149} 150 151@Concurrent 152async function testPromise4(): Promise<number> { 153 return 1; 154} 155 156@Concurrent 157async function testPromise5(): Promise<string> { 158 return await new Promise((resolve) => { 159 setTimeout(()=>{ 160 resolve("Promise setTimeout after resolve"); 161 }, 1000) 162 }); 163} 164 165async function testConcurrentFunc() { 166 let task1: taskpool.Task = new taskpool.Task(testPromise, 1, 2); 167 let task2: taskpool.Task = new taskpool.Task(testPromise1, 1, 2); 168 let task3: taskpool.Task = new taskpool.Task(testPromise2, 1, 2); 169 let task4: taskpool.Task = new taskpool.Task(testPromise3); 170 let task5: taskpool.Task = new taskpool.Task(testPromise4); 171 let task6: taskpool.Task = new taskpool.Task(testPromise5); 172 173 taskpool.execute(task1).then((d:object)=>{ 174 console.info("task1 res is: " + d); 175 }).catch((e:object)=>{ 176 console.info("task1 catch e: " + e); 177 }) 178 taskpool.execute(task2).then((d:object)=>{ 179 console.info("task2 res is: " + d); 180 }).catch((e:object)=>{ 181 console.info("task2 catch e: " + e); 182 }) 183 taskpool.execute(task3).then((d:object)=>{ 184 console.info("task3 res is: " + d); 185 }).catch((e:object)=>{ 186 console.info("task3 catch e: " + e); 187 }) 188 taskpool.execute(task4).then((d:object)=>{ 189 console.info("task4 res is: " + d); 190 }).catch((e:object)=>{ 191 console.info("task4 catch e: " + e); 192 }) 193 taskpool.execute(task5).then((d:object)=>{ 194 console.info("task5 res is: " + d); 195 }).catch((e:object)=>{ 196 console.info("task5 catch e: " + e); 197 }) 198 taskpool.execute(task6).then((d:object)=>{ 199 console.info("task6 res is: " + d); 200 }).catch((e:object)=>{ 201 console.info("task6 catch e: " + e); 202 }) 203} 204 205@Entry 206@Component 207struct Index { 208 @State message: string = 'Hello World'; 209 210 build() { 211 Row() { 212 Column() { 213 Button(this.message) 214 .fontSize(50) 215 .fontWeight(FontWeight.Bold) 216 .onClick(() => { 217 testConcurrentFunc(); 218 }) 219 } 220 .width('100%') 221 } 222 .height('100%') 223 } 224} 225``` 226<!--@[concurrent_taskpool_promise_return](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/MultithreadedConcurrency/TaskPoolIntroduction/entry/src/main/ets/managers/returnpromise.ets)--> 227 228#### 并发函数中使用自定义类或函数 229 230并发函数中使用自定义类或函数时需定义在不同文件,否则会被认为是闭包,如下例所示。 231 232示例: 233 234```ts 235// Index.ets 236import { taskpool } from '@kit.ArkTS'; 237import { BusinessError } from '@kit.BasicServicesKit'; 238import { testAdd, MyTestA, MyTestB } from './Test'; 239 240function add(arg: number) { 241 return ++arg; 242} 243 244class TestA { 245 constructor(name: string) { 246 this.name = name; 247 } 248 name: string = 'ClassA'; 249} 250 251class TestB { 252 static nameStr: string = 'ClassB'; 253} 254 255@Concurrent 256function TestFunc() { 257 // case1:在并发函数中直接调用同文件内定义的类或函数 258 259 // 直接调用同文件定义的函数add(),add飘红报错:Only imported variables and local variables can be used in @Concurrent decorated functions. <ArkTSCheck> 260 // add(1); 261 // 直接使用同文件定义的TestA构造,TestA飘红报错:Only imported variables and local variables can be used in @Concurrent decorated functions. <ArkTSCheck> 262 // let a = new TestA("aaa"); 263 // 直接访问同文件定义的TestB的成员nameStr,TestB飘红报错:Only imported variables and local variables can be used in @Concurrent decorated functions. <ArkTSCheck> 264 // console.info("TestB name is: " + TestB.nameStr); 265 266 // case2:在并发函数中调用定义在Test.ets文件并导入当前文件的类或函数 267 268 // 输出结果:res1 is: 2 269 console.info("res1 is: " + testAdd(1)); 270 let tmpStr = new MyTestA("TEST A"); 271 // 输出结果:res2 is: TEST A 272 console.info("res2 is: " + tmpStr.name); 273 // 输出结果:res3 is: MyTestB 274 console.info("res3 is: " + MyTestB.nameStr); 275} 276 277 278@Entry 279@Component 280struct Index { 281 @State message: string = 'Hello World'; 282 283 build() { 284 RelativeContainer() { 285 Text(this.message) 286 .id('HelloWorld') 287 .fontSize(50) 288 .fontWeight(FontWeight.Bold) 289 .alignRules({ 290 center: { anchor: '__container__', align: VerticalAlign.Center }, 291 middle: { anchor: '__container__', align: HorizontalAlign.Center } 292 }) 293 .onClick(() => { 294 let task = new taskpool.Task(TestFunc); 295 taskpool.execute(task).then(() => { 296 console.info("taskpool: execute task success!"); 297 }).catch((e:BusinessError) => { 298 console.error(`taskpool: execute: Code: ${e.code}, message: ${e.message}`); 299 }) 300 }) 301 } 302 .height('100%') 303 .width('100%') 304 } 305} 306``` 307<!--@[quickstart1](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/MultithreadedConcurrency/TaskPoolIntroduction/entry/src/main/ets/managers/generaluse.ets)--> 308 309```ts 310// Test.ets 311export function testAdd(arg: number) { 312 return ++arg; 313} 314 315@Sendable 316export class MyTestA { 317 constructor(name: string) { 318 this.name = name; 319 } 320 name: string = 'MyTestA'; 321} 322 323export class MyTestB { 324 static nameStr:string = 'MyTestB'; 325} 326``` 327<!--@[quickstart](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/MultithreadedConcurrency/TaskPoolIntroduction/entry/src/main/ets/managers/generaluse.ets)--> 328 329#### 并发异步函数中使用Promise 330 331并发异步函数中如果使用Promise,建议搭配await使用。这样TaskPool会捕获Promise中可能发生的异常。推荐使用示例如下。 332 333示例: 334 335```ts 336import { taskpool } from '@kit.ArkTS' 337 338@Concurrent 339async function testPromiseError() { 340 await new Promise<number>((resolve, reject) => { 341 resolve(1); 342 }).then(()=>{ 343 throw new Error("testPromise Error"); 344 }) 345} 346 347@Concurrent 348async function testPromiseError1() { 349 await new Promise<string>((resolve, reject) => { 350 reject("testPromiseError1 Error msg"); 351 }) 352} 353 354@Concurrent 355function testPromiseError2() { 356 return new Promise<string>((resolve, reject) => { 357 reject("testPromiseError2 Error msg"); 358 }) 359} 360 361async function testConcurrentFunc() { 362 let task1: taskpool.Task = new taskpool.Task(testPromiseError); 363 let task2: taskpool.Task = new taskpool.Task(testPromiseError1); 364 let task3: taskpool.Task = new taskpool.Task(testPromiseError2); 365 366 taskpool.execute(task1).then((d:object)=>{ 367 console.info("task1 res is: " + d); 368 }).catch((e:object)=>{ 369 console.info("task1 catch e: " + e); 370 }) 371 taskpool.execute(task2).then((d:object)=>{ 372 console.info("task2 res is: " + d); 373 }).catch((e:object)=>{ 374 console.info("task2 catch e: " + e); 375 }) 376 taskpool.execute(task3).then((d:object)=>{ 377 console.info("task3 res is: " + d); 378 }).catch((e:object)=>{ 379 console.info("task3 catch e: " + e); 380 }) 381} 382 383@Entry 384@Component 385struct Index { 386 @State message: string = 'Hello World'; 387 388 build() { 389 Row() { 390 Column() { 391 Button(this.message) 392 .fontSize(50) 393 .fontWeight(FontWeight.Bold) 394 .onClick(() => { 395 testConcurrentFunc(); 396 }) 397 } 398 .width('100%') 399 } 400 .height('100%') 401 } 402} 403``` 404<!--@[concurrent_taskpool_async_promise_usage](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/MultithreadedConcurrency/TaskPoolIntroduction/entry/src/main/ets/managers/asynchronousfunctions.ets)-->