• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# TaskPool简介
2
3TaskPool为应用程序提供多线程环境,降低资源消耗、提高系统性能,无需管理线程生命周期。具体接口信息及使用方法详情请见[TaskPool](../reference/apis-arkts/js-apis-taskpool.md)。
4
5## TaskPool运作机制
6
7TaskPool运作机制示意图
8
9![zh-cn_image_0000001964858368](figures/zh-cn_image_0000001964858368.png)
10
11TaskPool支持开发者在宿主线程提交任务到任务队列,系统选择合适的工作线程执行任务,再将结果返回给宿主线程。接口易用,支持任务执行、取消和指定优先级,同时通过系统统一线程管理,结合动态调度及负载均衡算法,可以节约系统资源。系统默认启动一个任务工作线程,任务多时会扩容。工作线程数量上限取决于设备的物理核数,内部管理具体数量,确保调度和执行效率最优。长时间无任务分发时会缩容,减少工作线程数量。具体扩缩容机制详情请见[TaskPool扩缩容机制](taskpool-introduction.md#taskpool扩缩容机制)。
12
13## TaskPool注意事项
14
15- 实现任务的函数需要使用[@Concurrent装饰器](#concurrent装饰器)标注,且仅支持在.ets文件中使用。
16
17- 从API version 11开始,跨并发实例传递带方法的实例对象时,该类必须使用装饰器[@Sendable装饰器](arkts-sendable.md#sendable装饰器)标注,且仅支持在.ets文件中使用。
18
19- 任务函数在TaskPool工作线程的执行耗时不能超过3分钟(不包含Promise和async/await异步调用的耗时,例如网络下载、文件读写等I/O任务的耗时)。否则,任务将被强制终止。
20
21- 实现任务的函数入参需满足序列化支持的类型,详情请参见[线程间通信对象](interthread-communication-overview.md)。目前不支持使用[@State装饰器](../ui/state-management/arkts-state.md)、[@Prop装饰器](../ui/state-management/arkts-prop.md)、[@Link装饰器](../ui/state-management/arkts-link.md)等装饰器修饰的复杂类型。
22
23- ArrayBuffer参数在TaskPool中默认转移,需要设置转移列表的话可通过接口[setTransferList()](../reference/apis-arkts/js-apis-taskpool.md#settransferlist10)设置。如果需要多次调用使用ArrayBuffer作为参数的task,则需要通过接口[setCloneList()](../reference/apis-arkts/js-apis-taskpool.md#setclonelist11)把ArrayBuffer在线程中的传输行为改成拷贝传递,避免对原有对象产生影响。
24
25  ```ts
26  import { taskpool } from '@kit.ArkTS';
27  import { BusinessError } from '@kit.BasicServicesKit';
28
29  @Concurrent
30  function printArrayBuffer(buffer:ArrayBuffer) {
31    return buffer
32  }
33
34  function testArrayBuffer() {
35    let buffer = new ArrayBuffer(1);
36    let group = new taskpool.TaskGroup();
37    let task = new taskpool.Task(printArrayBuffer, buffer);
38    group.addTask(task);
39    task.setCloneList([buffer]);
40    for (let i = 0; i < 5; i++) {
41      taskpool.execute(group).then(() => {
42        console.info("execute group success");
43      }).catch((e: BusinessError) => {
44        console.error("execute group error: " + e.message);
45      })
46    }
47  }
48  ```
49
50- 由于不同线程中上下文对象是不同的,因此TaskPool工作线程只能使用线程安全的库,例如UI相关的非线程安全库不能使用。
51
52- 序列化传输的数据量限制为16MB。
53
54- [Priority](../reference/apis-arkts/js-apis-taskpool.md#priority)的IDLE优先级是用来标记需要在后台运行的耗时任务(例如数据同步、备份),它的优先级别是最低的。这种优先级的任务只在所有线程都空闲时触发执行,并且只会占用一个线程。
55
56- Promise不支持跨线程传递。TaskPool返回pending或rejected状态的Promise时失败,返回fulfilled状态的Promise时TaskPool会解析返回的结果,如果结果可以跨线程传递,则返回成功。
57
58- 不支持在TaskPool工作线程中使用[AppStorage](../ui/state-management/arkts-appstorage.md)。
59
60- TaskPool支持开发者在宿主线程封装任务并提交给任务队列,理论上支持的任务数量没有上限。然而,任务的执行效率受限于任务的优先级和系统资源。当工作线程达到最大数量时,任务的执行效率可能会下降。
61
62- TaskPool不支持指定任务所运行的线程,任务会被分配到空闲的线程中执行。如果需要指定任务所运行的线程,建议使用[Worker](worker-introduction.md)。
63
64## \@Concurrent装饰器
65
66在使用[TaskPool](../reference/apis-arkts/js-apis-taskpool.md)时,执行的并发函数需要使用该装饰器修饰,否则无法通过相关校验。
67
68> **说明:**
69>
70> 从API version 9开始,支持使用\@Concurrent装饰器声明并校验并发函数。
71
72### 装饰器说明
73
74| \@Concurrent并发装饰器 | 说明 |
75| -------- | -------- |
76| 装饰器参数 | 无。 |
77| 使用场景 | 仅支持在Stage模型的工程中使用。仅支持在.ets文件中使用。 |
78| 装饰的函数类型 | 允许标注async函数或普通函数。禁止标注generator、箭头函数、类方法。不支持类成员函数或者匿名函数。 |
79| 装饰的函数内的变量类型 | 允许使用local变量、入参和通过import引入的变量。禁止使用闭包变量。 |
80| 装饰的函数内的返回值类型 | 支持的类型请查[线程间通信对象](interthread-communication-overview.md)。 |
81
82> **说明:**
83>
84> 由于\@Concurrent标记的函数不能访问闭包,因此函数内部不能调用当前文件的其他函数,例如:
85>
86> ```ts
87> function bar() {
88> }
89>
90> @Concurrent
91> function foo() {
92> bar(); // 违反闭包原则,报错
93> }
94> ```
95
96### 装饰器使用示例
97
98#### 并发函数一般使用
99
100并发函数为一个计算两数之和的普通函数,taskpool执行该函数并返回结果。
101
102示例:
103
104```ts
105import { taskpool } from '@kit.ArkTS';
106
107@Concurrent
108function add(num1: number, num2: number): number {
109  return num1 + num2;
110}
111
112async function ConcurrentFunc(): Promise<void> {
113  try {
114    let task: taskpool.Task = new taskpool.Task(add, 1, 2);
115    console.info("taskpool res is: " + await taskpool.execute(task));
116  } catch (e) {
117    console.error("taskpool execute error is: " + e);
118  }
119}
120
121@Entry
122@Component
123struct Index {
124  @State message: string = 'Hello World'
125
126  build() {
127    Row() {
128      Column() {
129        Text(this.message)
130          .fontSize(50)
131          .fontWeight(FontWeight.Bold)
132          .onClick(() => {
133            ConcurrentFunc();
134          })
135      }
136      .width('100%')
137    }
138    .height('100%')
139  }
140}
141```
142
143#### 并发函数返回Promise
144
145并发函数中返回Promise时需要特别关注。如示例所示,testPromise和testPromise1等并发同步函数会处理该Promise并返回结果。
146
147示例:
148
149```ts
150import { taskpool } from '@kit.ArkTS';
151
152@Concurrent
153function testPromise(args1: number, args2: number): Promise<number> {
154  return new Promise<number>((resolve, reject)=>{
155    resolve(args1 + args2);
156  });
157}
158
159@Concurrent
160async function testPromise1(args1: number, args2: number): Promise<number> {
161  return new Promise<number>((resolve, reject)=>{
162    resolve(args1 + args2);
163  });
164}
165
166@Concurrent
167async function testPromise2(args1: number, args2: number): Promise<number> {
168  return await new Promise<number>((resolve, reject)=>{
169    resolve(args1 + args2);
170  });
171}
172
173@Concurrent
174function testPromise3() {
175  return Promise.resolve(1);
176}
177
178@Concurrent
179async function testPromise4(): Promise<number> {
180  return 1;
181}
182
183@Concurrent
184async function testPromise5(): Promise<string> {
185  return await new Promise((resolve) => {
186    setTimeout(()=>{
187      resolve("Promise setTimeout after resolve");
188    }, 1000)
189  });
190}
191
192async function testConcurrentFunc() {
193  let task1: taskpool.Task = new taskpool.Task(testPromise, 1, 2);
194  let task2: taskpool.Task = new taskpool.Task(testPromise1, 1, 2);
195  let task3: taskpool.Task = new taskpool.Task(testPromise2, 1, 2);
196  let task4: taskpool.Task = new taskpool.Task(testPromise3);
197  let task5: taskpool.Task = new taskpool.Task(testPromise4);
198  let task6: taskpool.Task = new taskpool.Task(testPromise5);
199
200  taskpool.execute(task1).then((d:object)=>{
201    console.info("task1 res is: " + d);
202  }).catch((e:object)=>{
203    console.info("task1 catch e: " + e);
204  })
205  taskpool.execute(task2).then((d:object)=>{
206    console.info("task2 res is: " + d);
207  }).catch((e:object)=>{
208    console.info("task2 catch e: " + e);
209  })
210  taskpool.execute(task3).then((d:object)=>{
211    console.info("task3 res is: " + d);
212  }).catch((e:object)=>{
213    console.info("task3 catch e: " + e);
214  })
215  taskpool.execute(task4).then((d:object)=>{
216    console.info("task4 res is: " + d);
217  }).catch((e:object)=>{
218    console.info("task4 catch e: " + e);
219  })
220  taskpool.execute(task5).then((d:object)=>{
221    console.info("task5 res is: " + d);
222  }).catch((e:object)=>{
223    console.info("task5 catch e: " + e);
224  })
225  taskpool.execute(task6).then((d:object)=>{
226    console.info("task6 res is: " + d);
227  }).catch((e:object)=>{
228    console.info("task6 catch e: " + e);
229  })
230}
231
232@Entry
233@Component
234struct Index {
235  @State message: string = 'Hello World';
236
237  build() {
238    Row() {
239      Column() {
240        Button(this.message)
241          .fontSize(50)
242          .fontWeight(FontWeight.Bold)
243          .onClick(() => {
244            testConcurrentFunc();
245          })
246      }
247      .width('100%')
248    }
249    .height('100%')
250  }
251}
252```
253
254#### 并发函数中使用自定义类或函数
255
256在并发函数中使用自定义类或函数时,需将其定义在不同的文件中,否则会被认为是闭包。如以下示例所示。
257
258示例:
259
260```ts
261// Index.ets
262import { taskpool } from '@kit.ArkTS';
263import { BusinessError } from '@kit.BasicServicesKit';
264import { testAdd, MyTestA, MyTestB } from './Test';
265
266function add(arg: number) {
267  return ++arg;
268}
269
270class TestA {
271  constructor(name: string) {
272    this.name = name;
273  }
274  name: string = 'ClassA';
275}
276
277class TestB {
278  static nameStr: string = 'ClassB';
279}
280
281@Concurrent
282function TestFunc() {
283  // case1:在并发函数中直接调用同文件内定义的类或函数
284
285  // 直接调用同文件定义的函数add(),add飘红报错:Only imported variables and local variables can be used in @Concurrent decorated functions. <ArkTSCheck>
286  // add(1);
287  // 直接使用同文件定义的TestA构造,TestA飘红报错:Only imported variables and local variables can be used in @Concurrent decorated functions. <ArkTSCheck>
288  // let a = new TestA("aaa");
289  // 直接访问同文件定义的TestB的成员nameStr,TestB飘红报错:Only imported variables and local variables can be used in @Concurrent decorated functions. <ArkTSCheck>
290  // console.info("TestB name is: " + TestB.nameStr);
291
292  // case2:在并发函数中调用定义在Test.ets文件并导入当前文件的类或函数
293
294  // 输出结果:res1 is: 2
295  console.info("res1 is: " + testAdd(1));
296  let tmpStr = new MyTestA("TEST A");
297  // 输出结果:res2 is: TEST A
298  console.info("res2 is: " + tmpStr.name);
299  // 输出结果:res3 is: MyTestB
300  console.info("res3 is: " + MyTestB.nameStr);
301}
302
303
304@Entry
305@Component
306struct Index {
307  @State message: string = 'Hello World';
308
309  build() {
310    RelativeContainer() {
311      Text(this.message)
312        .id('HelloWorld')
313        .fontSize(50)
314        .fontWeight(FontWeight.Bold)
315        .alignRules({
316          center: { anchor: '__container__', align: VerticalAlign.Center },
317          middle: { anchor: '__container__', align: HorizontalAlign.Center }
318        })
319        .onClick(() => {
320          let task = new taskpool.Task(TestFunc);
321          taskpool.execute(task).then(() => {
322            console.info("taskpool: execute task success!");
323          }).catch((e:BusinessError) => {
324            console.error(`taskpool: execute: Code: ${e.code}, message: ${e.message}`);
325          })
326        })
327    }
328    .height('100%')
329    .width('100%')
330  }
331}
332```
333
334```ts
335// Test.ets
336export function testAdd(arg: number) {
337  return ++arg;
338}
339
340@Sendable
341export class MyTestA {
342  constructor(name: string) {
343    this.name = name;
344  }
345  name: string = 'MyTestA';
346}
347
348export class MyTestB {
349  static nameStr:string = 'MyTestB';
350}
351```
352
353#### 并发异步函数中使用Promise
354
355在并发异步函数中使用Promise时,建议搭配await使用。这样TaskPool可以捕获Promise中的异常。推荐使用示例如下。
356
357示例:
358
359```ts
360import { taskpool } from '@kit.ArkTS'
361
362@Concurrent
363async function testPromiseError() {
364  await new Promise<number>((resolve, reject) => {
365    resolve(1);
366  }).then(()=>{
367    throw new Error("testPromise Error");
368  })
369}
370
371@Concurrent
372async function testPromiseError1() {
373  await new Promise<string>((resolve, reject) => {
374    reject("testPromiseError1 Error msg");
375  })
376}
377
378@Concurrent
379function testPromiseError2() {
380  return new Promise<string>((resolve, reject) => {
381    reject("testPromiseError2 Error msg");
382  })
383}
384
385async function testConcurrentFunc() {
386  let task1: taskpool.Task = new taskpool.Task(testPromiseError);
387  let task2: taskpool.Task = new taskpool.Task(testPromiseError1);
388  let task3: taskpool.Task = new taskpool.Task(testPromiseError2);
389
390  taskpool.execute(task1).then((d:object)=>{
391    console.info("task1 res is: " + d);
392  }).catch((e:object)=>{
393    console.info("task1 catch e: " + e);
394  })
395  taskpool.execute(task2).then((d:object)=>{
396    console.info("task2 res is: " + d);
397  }).catch((e:object)=>{
398    console.info("task2 catch e: " + e);
399  })
400  taskpool.execute(task3).then((d:object)=>{
401    console.info("task3 res is: " + d);
402  }).catch((e:object)=>{
403    console.info("task3 catch e: " + e);
404  })
405}
406
407@Entry
408@Component
409struct Index {
410  @State message: string = 'Hello World';
411
412  build() {
413    Row() {
414      Column() {
415        Button(this.message)
416          .fontSize(50)
417          .fontWeight(FontWeight.Bold)
418          .onClick(() => {
419            testConcurrentFunc();
420          })
421      }
422      .width('100%')
423    }
424    .height('100%')
425  }
426}
427```
428
429## TaskPool扩缩容机制
430
431### 扩容机制
432
433一般情况下,开发者向任务队列提交任务时会触发扩容检测。扩容检测首先判断当前空闲的工作线程数是否大于任务数,如果大于,说明线程池中存在空闲工作线程,无需扩容。否则,通过负载计算确定所需工作线程数并创建。
434
435### 缩容机制
436
437扩容后,TaskPool新建多个工作线程,但当任务数减少后,这些线程就会处于空闲状态,造成资源浪费,因此TaskPool提供缩容机制。TaskPool使用了定时器,定时检测当前负载。定时器30s触发一次,每次尝试释放空闲的工作线程。释放的线程需要满足如下条件:
438
439- 该线程空闲时长达到30s。
440
441- 该线程上未执行长时任务([LongTask](../reference/apis-arkts/js-apis-taskpool.md#longtask12))。
442
443- 该线程上没有业务申请且未释放的句柄,例如[Timer(定时器)](../reference/common/js-apis-timer.md)。
444
445- 该线程处于非调试调优阶段。
446
447- 该线程中不存在已创建未销毁的子Worker。