• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# TaskPool简介
2
3任务池(TaskPool)作用是为应用程序提供一个多线程的运行环境,降低整体资源的消耗、提高系统的整体性能,且您无需关心线程实例的生命周期。具体接口信息及使用方法详情请见TaskPool。
4
5## TaskPool运作机制
6
7
8TaskPool支持开发者在宿主线程封装任务抛给任务队列,系统选择合适的工作线程,进行任务的分发及执行,再将结果返回给宿主线程。接口直观易用,支持任务的执行、取消,以及指定优先级的能力,同时通过系统统一线程管理,结合动态调度及负载均衡算法,可以节约系统资源。系统默认会启动一个任务工作线程,当任务较多时会扩容,工作线程数量上限跟当前设备的物理核数相关,具体数量内部管理,保证最优的调度及执行效率,长时间没有任务分发时会缩容,减少工作线程数量。
9
10## TaskPool注意事项
11
12- 实现任务的函数需要使用[@Concurrent装饰器](#concurrent装饰器)标注,且仅支持在.ets文件中使用。
13
14- 从API version 11开始,跨并发实例传递带方法的实例对象时,该类必须使用装饰器@Sendable装饰器标注,且仅支持在.ets文件中使用。
15
16- 任务函数在TaskPool工作线程的执行耗时不能超过3分钟(不包含Promise和async/await异步调用的耗时,例如网络下载、文件读写等I/O任务的耗时),否则会被强制退出。
17
18- 实现任务的函数入参需满足序列化支持的类型,详情请参见线程间通信对象。
19
20- ArrayBuffer参数在TaskPool中默认转移,需要设置转移列表的话可通过接口setTransferList()设置。
21
22- 由于不同线程中上下文对象是不同的,因此TaskPool工作线程只能使用线程安全的库,例如UI相关的非线程安全库不能使用。
23
24- 序列化传输的数据量大小限制为16MB。
25
26-Priority的IDLE优先级是用来标记需要在后台运行的耗时任务(例如数据同步、备份),它的优先级别是最低的。这种优先级标记的任务只会在所有线程都空闲的情况下触发执行,并且只会占用一个线程来执行。
27
28- Promise不支持跨线程传递,如果TaskPool返回pending或rejected状态的Promise,会返回失败;对于fulfilled状态的Promise,TaskPool会解析返回的结果,如果结果可以跨线程传递,则返回成功。
29
30- 不支持在TaskPool工作线程中使用AppStorage。
31
32- TaskPool支持开发者在宿主线程封装任务抛给任务队列,理论上可以支持任意多的任务,但任务的执行受限于任务的优先级以及系统资源的影响,在工作线程扩容到最大后,可能会导致任务的执行效率下降。
33
34- TaskPool不支持指定任务所运行的线程,任务会被分配到空闲的线程中执行。如果需要指定任务运行的线程,建议使用Worker来实现。
35
36## \@Concurrent装饰器
37
38在使用TaskPool时,执行的并发函数需要使用该装饰器修饰,否则无法通过相关校验。
39
40> **说明:**
41>
42> 从API version 9开始,支持使用\@Concurrent装饰器声明并校验并发函数。
43
44### 装饰器说明
45
46| \@Concurrent并发装饰器 | 说明 |
47| -------- | -------- |
48| 装饰器参数 | 无。 |
49| 使用场景 | 仅支持在Stage模型的工程中使用。仅支持在.ets文件中使用。 |
50| 装饰的函数类型 | 允许标注async函数或普通函数。禁止标注generator、箭头函数、类方法。不支持类成员函数或者匿名函数。 |
51| 装饰的函数内的变量类型 | 允许使用local变量、入参和通过import引入的变量。禁止使用闭包变量。 |
52| 装饰的函数内的返回值类型 | 支持的类型请查线程间通信对象。 |
53
54> **说明:**
55>
56> 由于\@Concurrent标记的函数不能访问闭包,因此\@Concurrent标记的函数内部不能调用当前文件的其他函数,例如:
57>
58> ```ts
59> function bar() {
60> }
61>
62> @Concurrent
63> function foo() {
64> bar(); // 违反闭包原则,报错
65> }
66> ```
67
68### 装饰器使用示例
69
70#### 并发函数一般使用
71
72并发函数为一个计算两数之和的普通函数,taskpool执行该函数并返回结果。
73
74示例:
75
76```ts
77import { taskpool } from '@kit.ArkTS';
78
79@Concurrent
80function add(num1: number, num2: number): number {
81  return num1 + num2;
82}
83
84async function ConcurrentFunc(): Promise<void> {
85  try {
86    let task: taskpool.Task = new taskpool.Task(add, 1, 2);
87    console.info("taskpool res is: " + await taskpool.execute(task));
88  } catch (e) {
89    console.error("taskpool execute error is: " + e);
90  }
91}
92
93@Entry
94@Component
95struct Index {
96  @State message: string = 'Hello World'
97
98  build() {
99    Row() {
100      Column() {
101        Text(this.message)
102          .fontSize(50)
103          .fontWeight(FontWeight.Bold)
104          .onClick(() => {
105            ConcurrentFunc();
106          })
107      }
108      .width('100%')
109    }
110    .height('100%')
111  }
112}
113```
114<!--@[concurrent_taskpool_common_usage](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/MultithreadedConcurrency/TaskPoolIntroduction/entry/src/main/ets/managers/generaluse.ets)-->
115
116#### 并发函数返回Promise
117
118并发函数中返回Promise的表现需关注,如下例所示,其中testPromise、testPromise1等并发同步函数会处理该Promise并返回结果。
119
120示例:
121
122```ts
123import { taskpool } from '@kit.ArkTS';
124
125@Concurrent
126function testPromise(args1: number, args2: number): Promise<number> {
127  return new Promise<number>((resolve, reject)=>{
128    resolve(args1 + args2);
129  });
130}
131
132@Concurrent
133async function testPromise1(args1: number, args2: number): Promise<number> {
134  return new Promise<number>((resolve, reject)=>{
135    resolve(args1 + args2);
136  });
137}
138
139@Concurrent
140async function testPromise2(args1: number, args2: number): Promise<number> {
141  return await new Promise<number>((resolve, reject)=>{
142    resolve(args1 + args2);
143  });
144}
145
146@Concurrent
147function testPromise3() {
148  return Promise.resolve(1);
149}
150
151@Concurrent
152async function testPromise4(): Promise<number> {
153  return 1;
154}
155
156@Concurrent
157async function testPromise5(): Promise<string> {
158  return await new Promise((resolve) => {
159    setTimeout(()=>{
160      resolve("Promise setTimeout after resolve");
161    }, 1000)
162  });
163}
164
165async function testConcurrentFunc() {
166  let task1: taskpool.Task = new taskpool.Task(testPromise, 1, 2);
167  let task2: taskpool.Task = new taskpool.Task(testPromise1, 1, 2);
168  let task3: taskpool.Task = new taskpool.Task(testPromise2, 1, 2);
169  let task4: taskpool.Task = new taskpool.Task(testPromise3);
170  let task5: taskpool.Task = new taskpool.Task(testPromise4);
171  let task6: taskpool.Task = new taskpool.Task(testPromise5);
172
173  taskpool.execute(task1).then((d:object)=>{
174    console.info("task1 res is: " + d);
175  }).catch((e:object)=>{
176    console.info("task1 catch e: " + e);
177  })
178  taskpool.execute(task2).then((d:object)=>{
179    console.info("task2 res is: " + d);
180  }).catch((e:object)=>{
181    console.info("task2 catch e: " + e);
182  })
183  taskpool.execute(task3).then((d:object)=>{
184    console.info("task3 res is: " + d);
185  }).catch((e:object)=>{
186    console.info("task3 catch e: " + e);
187  })
188  taskpool.execute(task4).then((d:object)=>{
189    console.info("task4 res is: " + d);
190  }).catch((e:object)=>{
191    console.info("task4 catch e: " + e);
192  })
193  taskpool.execute(task5).then((d:object)=>{
194    console.info("task5 res is: " + d);
195  }).catch((e:object)=>{
196    console.info("task5 catch e: " + e);
197  })
198  taskpool.execute(task6).then((d:object)=>{
199    console.info("task6 res is: " + d);
200  }).catch((e:object)=>{
201    console.info("task6 catch e: " + e);
202  })
203}
204
205@Entry
206@Component
207struct Index {
208  @State message: string = 'Hello World';
209
210  build() {
211    Row() {
212      Column() {
213        Button(this.message)
214          .fontSize(50)
215          .fontWeight(FontWeight.Bold)
216          .onClick(() => {
217            testConcurrentFunc();
218          })
219      }
220      .width('100%')
221    }
222    .height('100%')
223  }
224}
225```
226<!--@[concurrent_taskpool_promise_return](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/MultithreadedConcurrency/TaskPoolIntroduction/entry/src/main/ets/managers/returnpromise.ets)-->
227
228#### 并发函数中使用自定义类或函数
229
230并发函数中使用自定义类或函数时需定义在不同文件,否则会被认为是闭包,如下例所示。
231
232示例:
233
234```ts
235// Index.ets
236import { taskpool } from '@kit.ArkTS';
237import { BusinessError } from '@kit.BasicServicesKit';
238import { testAdd, MyTestA, MyTestB } from './Test';
239
240function add(arg: number) {
241  return ++arg;
242}
243
244class TestA {
245  constructor(name: string) {
246    this.name = name;
247  }
248  name: string = 'ClassA';
249}
250
251class TestB {
252  static nameStr: string = 'ClassB';
253}
254
255@Concurrent
256function TestFunc() {
257  // case1:在并发函数中直接调用同文件内定义的类或函数
258
259  // 直接调用同文件定义的函数add(),add飘红报错:Only imported variables and local variables can be used in @Concurrent decorated functions. <ArkTSCheck>
260  // add(1);
261  // 直接使用同文件定义的TestA构造,TestA飘红报错:Only imported variables and local variables can be used in @Concurrent decorated functions. <ArkTSCheck>
262  // let a = new TestA("aaa");
263  // 直接访问同文件定义的TestB的成员nameStr,TestB飘红报错:Only imported variables and local variables can be used in @Concurrent decorated functions. <ArkTSCheck>
264  // console.info("TestB name is: " + TestB.nameStr);
265
266  // case2:在并发函数中调用定义在Test.ets文件并导入当前文件的类或函数
267
268  // 输出结果:res1 is: 2
269  console.info("res1 is: " + testAdd(1));
270  let tmpStr = new MyTestA("TEST A");
271  // 输出结果:res2 is: TEST A
272  console.info("res2 is: " + tmpStr.name);
273  // 输出结果:res3 is: MyTestB
274  console.info("res3 is: " + MyTestB.nameStr);
275}
276
277
278@Entry
279@Component
280struct Index {
281  @State message: string = 'Hello World';
282
283  build() {
284    RelativeContainer() {
285      Text(this.message)
286        .id('HelloWorld')
287        .fontSize(50)
288        .fontWeight(FontWeight.Bold)
289        .alignRules({
290          center: { anchor: '__container__', align: VerticalAlign.Center },
291          middle: { anchor: '__container__', align: HorizontalAlign.Center }
292        })
293        .onClick(() => {
294          let task = new taskpool.Task(TestFunc);
295          taskpool.execute(task).then(() => {
296            console.info("taskpool: execute task success!");
297          }).catch((e:BusinessError) => {
298            console.error(`taskpool: execute: Code: ${e.code}, message: ${e.message}`);
299          })
300        })
301    }
302    .height('100%')
303    .width('100%')
304  }
305}
306```
307<!--@[quickstart1](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/MultithreadedConcurrency/TaskPoolIntroduction/entry/src/main/ets/managers/generaluse.ets)-->
308
309```ts
310// Test.ets
311export function testAdd(arg: number) {
312  return ++arg;
313}
314
315@Sendable
316export class MyTestA {
317  constructor(name: string) {
318    this.name = name;
319  }
320  name: string = 'MyTestA';
321}
322
323export class MyTestB {
324  static nameStr:string = 'MyTestB';
325}
326```
327<!--@[quickstart](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/MultithreadedConcurrency/TaskPoolIntroduction/entry/src/main/ets/managers/generaluse.ets)-->
328
329#### 并发异步函数中使用Promise
330
331并发异步函数中如果使用Promise,建议搭配await使用。这样TaskPool会捕获Promise中可能发生的异常。推荐使用示例如下。
332
333示例:
334
335```ts
336import { taskpool } from '@kit.ArkTS'
337
338@Concurrent
339async function testPromiseError() {
340  await new Promise<number>((resolve, reject) => {
341    resolve(1);
342  }).then(()=>{
343    throw new Error("testPromise Error");
344  })
345}
346
347@Concurrent
348async function testPromiseError1() {
349  await new Promise<string>((resolve, reject) => {
350    reject("testPromiseError1 Error msg");
351  })
352}
353
354@Concurrent
355function testPromiseError2() {
356  return new Promise<string>((resolve, reject) => {
357    reject("testPromiseError2 Error msg");
358  })
359}
360
361async function testConcurrentFunc() {
362  let task1: taskpool.Task = new taskpool.Task(testPromiseError);
363  let task2: taskpool.Task = new taskpool.Task(testPromiseError1);
364  let task3: taskpool.Task = new taskpool.Task(testPromiseError2);
365
366  taskpool.execute(task1).then((d:object)=>{
367    console.info("task1 res is: " + d);
368  }).catch((e:object)=>{
369    console.info("task1 catch e: " + e);
370  })
371  taskpool.execute(task2).then((d:object)=>{
372    console.info("task2 res is: " + d);
373  }).catch((e:object)=>{
374    console.info("task2 catch e: " + e);
375  })
376  taskpool.execute(task3).then((d:object)=>{
377    console.info("task3 res is: " + d);
378  }).catch((e:object)=>{
379    console.info("task3 catch e: " + e);
380  })
381}
382
383@Entry
384@Component
385struct Index {
386  @State message: string = 'Hello World';
387
388  build() {
389    Row() {
390      Column() {
391        Button(this.message)
392          .fontSize(50)
393          .fontWeight(FontWeight.Bold)
394          .onClick(() => {
395            testConcurrentFunc();
396          })
397      }
398      .width('100%')
399    }
400    .height('100%')
401  }
402}
403```
404<!--@[concurrent_taskpool_async_promise_usage](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/MultithreadedConcurrency/TaskPoolIntroduction/entry/src/main/ets/managers/asynchronousfunctions.ets)-->