• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# TaskPool简介
2<!--Kit: ArkTS-->
3<!--Subsystem: CommonLibrary-->
4<!--Owner: @wang_zhaoyong-->
5<!--Designer: @weng-changcheng-->
6<!--Tester: @kirl75; @zsw_zhushiwei-->
7<!--Adviser: @ge-yafang-->
8
9TaskPool为应用程序提供多线程环境,降低资源消耗并提高系统性能。无需管理线程生命周期。具体接口信息及使用方法,请参见[TaskPool](../reference/apis-arkts/js-apis-taskpool.md)。
10
11## TaskPool运作机制
12
13TaskPool运作机制示意图
14
15![zh-cn_image_0000001964858368](figures/zh-cn_image_0000001964858368.png)
16
17TaskPool支持在宿主线程提交任务到任务队列,系统选择合适的工作线程执行任务,并将结果返回给宿主线程。接口易用,支持任务执行、取消和指定优先级。通过系统统一线程管理,结合动态调度和负载均衡算法,可以节约系统资源。系统默认启动一个任务工作线程,任务多时会自动扩容。工作线程数量上限由设备的物理核数决定,内部管理具体数量,确保调度和执行效率最优。长时间无任务分发时会缩容,减少工作线程数量。具体扩缩容机制请参见[TaskPool扩缩容机制](taskpool-introduction.md#taskpool扩缩容机制)。
18
19## TaskPool注意事项
20
21- 实现任务的函数需要使用[@Concurrent装饰器](#concurrent装饰器)标注,且仅支持在.ets文件中使用。
22
23- 从API version 11开始,跨并发实例传递带方法的实例对象时,该类必须使用装饰器[@Sendable装饰器](arkts-sendable.md#sendable装饰器)标注,且仅支持在.ets文件中使用。
24
25- 任务函数([LongTask](../reference/apis-arkts/js-apis-taskpool.md#longtask12)除外)在TaskPool工作线程中的执行时长不能超过3分钟。否则,任务将被强制终止。需要注意的是,这里的3分钟限制仅统计TaskPool线程的​​同步运行时长​​,不包含异步操作(如Promise或async/await)的等待时长。例如,数据库的插入、删除、更新等操作,如果是异步操作,仅计入CPU实际处理时长(如SQL解析),网络传输或磁盘I/O等待时长不计入;如果是同步操作,整个操作时长(含I/O阻塞时间)均计入限制。开发者可通过[Task](../reference/apis-arkts/js-apis-taskpool.md#task)的属性ioDuration、cpuDuration获取执行当前任务的异步IO耗时和CPU耗时。
26
27- 实现任务的函数入参需满足序列化支持的类型。详情请参见[线程间通信对象](interthread-communication-overview.md)。目前不支持使用[@State装饰器](../ui/state-management/arkts-state.md)、[@Prop装饰器](../ui/state-management/arkts-prop.md)、[@Link装饰器](../ui/state-management/arkts-link.md)等装饰器修饰的复杂类型。
28
29- ArrayBuffer参数在TaskPool中默认转移,需要设置转移列表的话可通过接口[setTransferList()](../reference/apis-arkts/js-apis-taskpool.md#settransferlist10)设置。如果需要多次调用使用ArrayBuffer作为参数的task,则需要通过接口[setCloneList()](../reference/apis-arkts/js-apis-taskpool.md#setclonelist11)把ArrayBuffer在线程中的传输行为改成拷贝传递,避免对原有对象产生影响。
30
31除上述注意事项外,使用TaskPool时还需注意[并发注意事项](multi-thread-concurrency-overview.md#并发注意事项)。
32
33  ```ts
34  import { taskpool } from '@kit.ArkTS';
35  import { BusinessError } from '@kit.BasicServicesKit';
36
37  @Concurrent
38  function printArrayBuffer(buffer: ArrayBuffer) {
39    return buffer;
40  }
41
42  function testArrayBuffer() {
43    const buffer = new ArrayBuffer(1);
44    const group = new taskpool.TaskGroup();
45    const task = new taskpool.Task(printArrayBuffer, buffer);
46    group.addTask(task);
47    task.setCloneList([buffer]);
48    for (let i = 0; i < 5; i++) {
49      taskpool.execute(group).then(() => {
50        console.info('execute group success');
51      }).catch((e: BusinessError) => {
52        console.error(`execute group error: ${e.message}`);
53      })
54    }
55  }
56  ```
57
58- 由于不同线程中上下文对象不同,TaskPool工作线程只能使用线程安全的库。例如,不能使用UI相关的非线程安全库。
59
60- 序列化传输的数据量限制为16MB。
61
62- [Priority](../reference/apis-arkts/js-apis-taskpool.md#priority)的IDLE优先级是用来标记需要在后台运行的耗时任务(例如数据同步、备份),它的优先级别是最低的。这种优先级的任务只在所有线程都空闲时触发执行,并且同一时间只会有一个IDLE优先级的任务执行。
63
64- Promise不支持跨线程传递。TaskPool返回pending或rejected状态的Promise时会失败,返回fulfilled状态的Promise时TaskPool会解析返回的结果,如果结果可以跨线程传递,则返回成功。
65
66- 不支持在TaskPool工作线程中使用[AppStorage](../ui/state-management/arkts-appstorage.md)。
67
68- TaskPool支持在宿主线程封装任务并提交给任务队列,理论上支持的任务数量没有上限。然而,任务的执行效率受限于任务的优先级和系统资源。当工作线程达到最大数量时,任务的执行效率可能会下降
69
70- TaskPool不支持指定任务所运行的线程,任务会被分配到空闲的线程中执行。如果需要指定任务所运行的线程,建议使用[Worker](worker-introduction.md)。
71
72## \@Concurrent装饰器
73
74使用[TaskPool](../reference/apis-arkts/js-apis-taskpool.md)时,执行的并发函数必须用该装饰器修饰,否则无法通过校验。
75
76> **说明:**
77>
78> 从API version 9开始,支持使用\@Concurrent装饰器声明并校验并发函数。
79
80| \@Concurrent并发装饰器 | 说明 |
81| -------- | -------- |
82| 装饰器参数 | 无。 |
83| 使用场景 | 仅支持在Stage模型的工程中使用。仅支持在.ets文件中使用。 |
84| 装饰的函数类型 | 允许标注为async函数或普通函数。禁止标注为generator、箭头函数、类方法。不支持类成员函数或者匿名函数。 |
85| 装饰的函数内的变量类型 | 允许使用局部变量、入参和通过import引入的变量,禁止使用闭包变量。 |
86| 装饰的函数内的返回值类型 | 支持的类型请查[线程间通信对象](interthread-communication-overview.md)。 |
87
88> **说明:**
89>
90> 由于\@Concurrent标记的函数不能访问闭包,因此函数内部不能调用当前文件的其他函数,例如:
91>
92> ```ts
93> function bar() {
94> }
95>
96> @Concurrent
97> function foo() {
98>   bar(); // 违反闭包原则,报错
99> }
100> ```
101
102## 装饰器使用示例
103
104### 并发函数一般使用
105
106并发函数为一个计算两数之和的普通函数,taskpool执行该函数并返回结果。
107
108示例:
109
110```ts
111import { taskpool } from '@kit.ArkTS';
112
113@Concurrent
114function add(num1: number, num2: number): number {
115  return num1 + num2;
116}
117
118async function concurrentFunc(): Promise<void> {
119  try {
120    const task: taskpool.Task = new taskpool.Task(add, 1, 2);
121    console.info(`taskpool res is: ${await taskpool.execute(task)}`); // 输出结果:taskpool res is: 3
122  } catch (e) {
123    console.error(`taskpool execute error is: ${e}`);
124  }
125}
126
127@Entry
128@Component
129struct Index {
130  @State message: string = 'Hello World';
131
132  build() {
133    Row() {
134      Column() {
135        Text(this.message)
136          .fontSize(50)
137          .fontWeight(FontWeight.Bold)
138          .onClick(() => {
139            concurrentFunc();
140          })
141      }
142      .width('100%')
143    }
144    .height('100%')
145  }
146}
147```
148<!-- @[concurrent_taskpool_common_usage](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/MultithreadedConcurrency/TaskPoolIntroduction/entry/src/main/ets/managers/generaluse.ets) -->
149
150### 并发函数返回Promise
151
152在并发函数中返回Promise时需特别注意。如示例所示,testPromise和testPromise1等函数需处理Promise并返回结果。
153
154示例:
155
156```ts
157import { taskpool } from '@kit.ArkTS';
158
159@Concurrent
160function testPromise(args1: number, args2: number): Promise<number> {
161  return new Promise<number>((resolve, reject) => {
162    resolve(args1 + args2);
163  });
164}
165
166@Concurrent
167async function testPromise1(args1: number, args2: number): Promise<number> {
168  return new Promise<number>((resolve, reject) => {
169    resolve(args1 + args2);
170  });
171}
172
173@Concurrent
174async function testPromise2(args1: number, args2: number): Promise<number> {
175  return await new Promise<number>((resolve, reject) => {
176    resolve(args1 + args2);
177  });
178}
179
180@Concurrent
181function testPromise3() {
182  return Promise.resolve(1);
183}
184
185@Concurrent
186async function testPromise4(): Promise<number> {
187  return 1;
188}
189
190@Concurrent
191async function testPromise5(): Promise<string> {
192  return await new Promise((resolve) => {
193    setTimeout(() => {
194      resolve('Promise setTimeout after resolve');
195    }, 1000)
196  });
197}
198
199async function testConcurrentFunc() {
200  const task1: taskpool.Task = new taskpool.Task(testPromise, 1, 2);
201  const task2: taskpool.Task = new taskpool.Task(testPromise1, 1, 2);
202  const task3: taskpool.Task = new taskpool.Task(testPromise2, 1, 2);
203  const task4: taskpool.Task = new taskpool.Task(testPromise3);
204  const task5: taskpool.Task = new taskpool.Task(testPromise4);
205  const task6: taskpool.Task = new taskpool.Task(testPromise5);
206
207  taskpool.execute(task1).then((d: object) => {
208    console.info(`task1 res is: ${d}`); // 输出结果:task1 res is: 3
209  }).catch((e: object) => {
210    console.error(`task1 catch e: ${e}`);
211  })
212  taskpool.execute(task2).then((d: object) => {
213    console.info(`task2 res is: ${d}`);
214  }).catch((e: object) => {
215    console.error(`task2 catch e: ${e}`); // 输出结果:task2 catch e: Error: Can't return Promise in pending state
216  })
217  taskpool.execute(task3).then((d: object) => {
218    console.info(`task3 res is: ${d}`); // 输出结果:task3 res is: 3
219  }).catch((e: object) => {
220    console.error(`task3 catch e: ${e}`);
221  })
222  taskpool.execute(task4).then((d: object) => {
223    console.info(`task4 res is: ${d}`); // 输出结果:task4 res is: 1
224  }).catch((e: object) => {
225    console.error(`task4 catch e: ${e}`);
226  })
227  taskpool.execute(task5).then((d: object) => {
228    console.info(`task5 res is: ${d}`); // 输出结果:task5 res is: 1
229  }).catch((e: object) => {
230    console.error(`task5 catch e: ${e}`);
231  })
232  taskpool.execute(task6).then((d: object) => {
233    console.info(`task6 res is: ${d}`); // 输出结果:task6 res is: Promise setTimeout after resolve
234  }).catch((e: object) => {
235    console.error(`task6 catch e: ${e}`);
236  })
237}
238
239@Entry
240@Component
241struct Index {
242  @State message: string = 'Hello World';
243
244  build() {
245    Row() {
246      Column() {
247        Button(this.message)
248          .fontSize(50)
249          .fontWeight(FontWeight.Bold)
250          .onClick(() => {
251            testConcurrentFunc();
252          })
253      }
254      .width('100%')
255    }
256    .height('100%')
257  }
258}
259```
260<!-- @[concurrent_taskpool_promise_return](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/MultithreadedConcurrency/TaskPoolIntroduction/entry/src/main/ets/managers/returnpromise.ets) -->
261
262### 并发函数中使用自定义类或函数
263
264在并发函数中使用自定义类或函数时,需将其定义在单独的文件中,否则可能被视为闭包。如下示例所示。
265
266示例:
267
268```ts
269// Index.ets
270import { taskpool } from '@kit.ArkTS';
271import { BusinessError } from '@kit.BasicServicesKit';
272import { testAdd, MyTestA, MyTestB } from './Test';
273
274function add(arg: number) {
275  return ++arg;
276}
277
278class TestA {
279  constructor(name: string) {
280    this.name = name;
281  }
282
283  name: string = 'ClassA';
284}
285
286class TestB {
287  static nameStr: string = 'ClassB';
288}
289
290@Concurrent
291function TestFunc() {
292  // case1:在并发函数中直接调用同文件内定义的类或函数
293
294  // 直接调用同文件定义的函数add(),add飘红报错:Only imported variables and local variables can be used in @Concurrent decorated functions. <ArkTSCheck>
295  // add(1);
296  // 直接使用同文件定义的TestA构造,TestA飘红报错:Only imported variables and local variables can be used in @Concurrent decorated functions. <ArkTSCheck>
297  // const a = new TestA('aaa');
298  // 直接访问同文件定义的TestB的成员nameStr,TestB飘红报错:Only imported variables and local variables can be used in @Concurrent decorated functions. <ArkTSCheck>
299  // console.info(`TestB name is: ${TestB.nameStr}`);
300
301  // case2:在并发函数中调用定义在Test.ets文件并导入当前文件的类或函数
302
303  // 输出结果:res1 is: 2
304  console.info(`res1 is: ${testAdd(1)}`);
305  const tmpStr = new MyTestA('TEST A');
306  // 输出结果:res2 is: TEST A
307  console.info(`res2 is: ${tmpStr.name}`);
308  // 输出结果:res3 is: MyTestB
309  console.info(`res3 is: ${MyTestB.nameStr}`);
310}
311
312
313@Entry
314@Component
315struct Index {
316  @State message: string = 'Hello World';
317
318  build() {
319    RelativeContainer() {
320      Text(this.message)
321        .id('HelloWorld')
322        .fontSize(50)
323        .fontWeight(FontWeight.Bold)
324        .alignRules({
325          center: { anchor: '__container__', align: VerticalAlign.Center },
326          middle: { anchor: '__container__', align: HorizontalAlign.Center }
327        })
328        .onClick(() => {
329          const task = new taskpool.Task(TestFunc);
330          taskpool.execute(task).then(() => {
331            console.info('taskpool: execute task success!');
332          }).catch((e: BusinessError) => {
333            console.error(`taskpool: execute: Code: ${e.code}, message: ${e.message}`);
334          })
335        })
336    }
337    .height('100%')
338    .width('100%')
339  }
340}
341```
342<!-- @[concurrent_taskpool_custom_class_function](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/MultithreadedConcurrency/TaskPoolIntroduction/entry/src/main/ets/managers/customclasses.ets) -->
343
344```ts
345// Test.ets
346export function testAdd(arg: number) {
347  return ++arg;
348}
349
350@Sendable
351export class MyTestA {
352  constructor(name: string) {
353    this.name = name;
354  }
355  name: string = 'MyTestA';
356}
357
358export class MyTestB {
359  static nameStr:string = 'MyTestB';
360}
361```
362<!-- @[concurrent_taskpool_test_resources](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/MultithreadedConcurrency/TaskPoolIntroduction/entry/src/main/ets/managers/Test.ets) -->
363
364### 并发异步函数中使用Promise
365
366在并发异步函数中使用Promise时,建议搭配await使用,这样TaskPool可以捕获Promise中的异常。推荐使用示例如下。
367
368示例:
369
370```ts
371import { taskpool } from '@kit.ArkTS';
372
373@Concurrent
374async function testPromiseError() {
375  await new Promise<number>((resolve, reject) => {
376    resolve(1);
377  }).then(() => {
378    throw new Error('testPromise error');
379  })
380}
381
382@Concurrent
383async function testPromiseError1() {
384  await new Promise<string>((resolve, reject) => {
385    reject('testPromiseError1 error msg');
386  })
387}
388
389@Concurrent
390function testPromiseError2() {
391  return new Promise<string>((resolve, reject) => {
392    reject('testPromiseError2 error msg');
393  })
394}
395
396async function testConcurrentFunc() {
397  const task1: taskpool.Task = new taskpool.Task(testPromiseError);
398  const task2: taskpool.Task = new taskpool.Task(testPromiseError1);
399  const task3: taskpool.Task = new taskpool.Task(testPromiseError2);
400
401  taskpool.execute(task1).then((d: object) => {
402    console.info(`task1 res is: ${d}`);
403  }).catch((e: object) => {
404    console.error(`task1 catch e: ${e}`); // task1 catch e: Error: testPromise error
405  })
406  taskpool.execute(task2).then((d: object) => {
407    console.info(`task2 res is: ${d}`);
408  }).catch((e: object) => {
409    console.error(`task2 catch e: ${e}`); // task2 catch e: testPromiseError1 error msg
410  })
411  taskpool.execute(task3).then((d: object) => {
412    console.info(`task3 res is: ${d}`);
413  }).catch((e: object) => {
414    console.error(`task3 catch e: ${e}`); // task3 catch e: testPromiseError2 error msg
415  })
416}
417
418@Entry
419@Component
420struct Index {
421  @State message: string = 'Hello World';
422
423  build() {
424    Row() {
425      Column() {
426        Button(this.message)
427          .fontSize(50)
428          .fontWeight(FontWeight.Bold)
429          .onClick(() => {
430            testConcurrentFunc();
431          })
432      }
433      .width('100%')
434    }
435    .height('100%')
436  }
437}
438```
439<!-- @[concurrent_taskpool_async_promise_usage](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/MultithreadedConcurrency/TaskPoolIntroduction/entry/src/main/ets/managers/asynchronousfunctions.ets) -->
440
441## TaskPool扩缩容机制
442
443### 扩容机制
444
445一般情况下,向任务队列提交任务时会触发扩容检测。扩容检测首先判断当前空闲工作线程数是否大于任务数。如果大于,说明线程池中有空闲工作线程,无需扩容。否则,通过负载计算确定所需工作线程数并创建。
446
447### 缩容机制
448
449扩容后,TaskPool创建多个工作线程,但当任务数减少后,这些线程就会处于空闲状态,造成资源浪费,因此,TaskPool提供了缩容机制。TaskPool使用定时器,每30秒检测一次当前负载,并尝试释放空闲的工作线程。释放的线程需满足以下条件:
450
451- 该线程空闲时长达到30s。
452
453- 该线程上未执行长时任务([LongTask](../reference/apis-arkts/js-apis-taskpool.md#longtask12))。
454
455- 该线程上没有业务申请且未释放的句柄,例如[Timer(定时器)](../reference/common/js-apis-timer.md)。
456
457- 该线程处于非调试调优阶段。
458
459- 该线程中不存在已创建未销毁的子Worker。