• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# CPU密集型任务开发指导 (TaskPool和Worker)
2<!--Kit: ArkTS-->
3<!--Subsystem: CommonLibrary-->
4<!--Owner: @lijiamin2025-->
5<!--Designer: @weng-changcheng-->
6<!--Tester: @kirl75; @zsw_zhushiwei-->
7<!--Adviser: @ge-yafang-->
8
9
10CPU密集型任务是指需要占用系统资源进行大量计算的任务,这类任务需要长时间运行,会阻塞线程中其他事件的处理,因此不适合在UI主线程中执行。例如图像处理、视频编码、数据分析等。
11
12
13基于多线程并发机制处理CPU密集型任务可以提高CPU利用率,提升应用程序响应速度。
14
15
16当任务不需要长时间(3分钟)占用后台线程,而是一个个独立的任务时,推荐使用TaskPool,反之推荐使用Worker。
17
18接下来将分别以图像直方图处理和后台长时间模型预测任务为例进行说明。
19
20
21## 使用TaskPool进行图像直方图处理
22
231. 实现图像处理的业务逻辑。
24
252. 对数据进行分段,并通过任务组发起关联任务调度。
26   创建[TaskGroup](../reference/apis-arkts/js-apis-taskpool.md#taskgroup10),通过[addTask()](../reference/apis-arkts/js-apis-taskpool.md#addtask10)添加对应的任务,然后通过[execute()](../reference/apis-arkts/js-apis-taskpool.md#taskpoolexecute10)执行任务组,并指定为[高优先级](../reference/apis-arkts/js-apis-taskpool.md#priority)。在当前任务组所有任务结束后,会将直方图处理结果同时返回。
27
283. 汇总处理结果数组。
29
30```ts
31// Index.ets
32import { taskpool } from '@kit.ArkTS';
33
34@Concurrent
35function imageProcessing(dataSlice: ArrayBuffer): ArrayBuffer {
36  // 步骤1: 具体的图像处理操作及其他耗时操作
37  return dataSlice;
38}
39
40function histogramStatistic(pixelBuffer: ArrayBuffer): void {
41  // 步骤2: 分成三段并发调度
42  let number: number = pixelBuffer.byteLength / 3;
43  let buffer1: ArrayBuffer = pixelBuffer.slice(0, number);
44  let buffer2: ArrayBuffer = pixelBuffer.slice(number, number * 2);
45  let buffer3: ArrayBuffer = pixelBuffer.slice(number * 2);
46
47  let group: taskpool.TaskGroup = new taskpool.TaskGroup();
48  group.addTask(imageProcessing, buffer1);
49  group.addTask(imageProcessing, buffer2);
50  group.addTask(imageProcessing, buffer3);
51
52  taskpool.execute(group, taskpool.Priority.HIGH).then((ret: Object) => {
53    // 步骤3: 结果数组汇总处理
54    console.info('execute group success');
55  })
56}
57
58@Entry
59@Component
60struct Index {
61  @State message: string = 'Hello World'
62
63  build() {
64    Row() {
65      Column() {
66        Text(this.message)
67          .fontSize(50)
68          .fontWeight(FontWeight.Bold)
69          .onClick(() => {
70            let buffer: ArrayBuffer = new ArrayBuffer(24);
71            histogramStatistic(buffer);
72          })
73      }
74      .width('100%')
75    }
76    .height('100%')
77  }
78}
79```
80<!-- @[process_image_histogram](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/ApplicationMultithreadingDevelopment/ApplicationMultithreading/entry/src/main/ets/managers/CpuIntensiveTaskDevelopment.ets) -->
81
82
83## 使用Worker进行长时间数据分析
84
85本文通过某地区提供的房价数据训练一个简易的房价预测模型,该模型支持通过输入房屋面积和房间数量去预测该区域的房价,模型需要长时间运行,房价预测需要使用前面的模型运行结果,因此需要使用Worker。
86
871. DevEco Studio提供了Worker创建的模板,创建一个Worker线程,例如命名为“MyWorker”。
88
89   ![newWorker](figures/newWorker.png)
90
912. 在宿主线程中首先调用ThreadWorker的[constructor()](../reference/apis-arkts/js-apis-worker.md#constructor9)方法创建Worker对象;然后通过注册[onmessage()](../reference/apis-arkts/js-apis-worker.md#属性-1)回调接收Worker线程发送过来的消息;最后通过调用[postMessage()](../reference/apis-arkts/js-apis-worker.md#postmessage9)方法向Worker线程发送消息。
92  例如,向Worker线程发送训练和预测的消息,并接收Worker线程发送回来的消息。
93
94    ```ts
95    // Index.ets
96    import { ErrorEvent, MessageEvents, worker } from '@kit.ArkTS';
97
98    const workerInstance: worker.ThreadWorker = new worker.ThreadWorker('entry/ets/workers/MyWorker.ets');
99
100    let done = false;
101
102    // 接收Worker子线程的结果
103    workerInstance.onmessage = (event: MessageEvents) => {
104      console.info('MyWorker.ets onmessage');
105      if (!done) {
106        workerInstance.postMessage({ 'type': 1, 'value': 0 });
107        done = true;
108      }
109    }
110
111    workerInstance.onAllErrors = (err: ErrorEvent) => {
112      // 接收Worker子线程的错误信息
113    }
114
115    // 向Worker子线程发送训练消息
116    workerInstance.postMessage({ 'type': 0 });
117    ```
118    <!-- @[call_worker_message](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/ApplicationMultithreadingDevelopment/ApplicationMultithreading/entry/src/main/ets/managers/CpuIntensiveTaskDevelopment.ets) -->
119
1203. 在MyWorker.ets文件中绑定Worker对象,当前线程即为Worker线程。在Worker线程中通过注册[onmessage()](../reference/apis-arkts/js-apis-worker.md#属性-2)回调接收宿主线程发送的消息,并通过调用[postMessage()](../reference/apis-arkts/js-apis-worker.md#postmessage9-2)方法向宿主线程发送消息。
121    例如,在Worker线程中定义预测模型及其训练过程,并与宿主线程进行信息交互。
122
123    ```ts
124    // MyWorker.ets
125    import { worker, ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@kit.ArkTS';
126
127    let workerPort: ThreadWorkerGlobalScope = worker.workerPort;
128
129    // 定义训练模型及结果
130    let result: Array<number>;
131    // 定义预测函数
132    function predict(x: number): number {
133     return result[x];
134    }
135    // 定义优化器训练过程
136    function optimize(): void {
137     result = [0];
138    }
139    // Worker线程的onmessage逻辑
140    workerPort.onmessage = (e: MessageEvents): void => {
141     // 根据传输的数据的type选择进行操作
142     switch (e.data.type as number) {
143      case 0:
144      // 进行训练
145       optimize();
146      // 训练之后发送宿主线程训练成功的消息
147       workerPort.postMessage({ type: 'message', value: 'train success.' });
148       break;
149      case 1:
150      // 执行预测
151       const output: number = predict(e.data.value as number);
152      // 发送宿主线程预测的结果
153       workerPort.postMessage({ type: 'predict', value: output });
154       break;
155      default:
156       workerPort.postMessage({ type: 'message', value: 'send message is invalid' });
157       break;
158     }
159    }
160    ```
161    <!-- @[interact_main_thread](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/ApplicationMultithreadingDevelopment/ApplicationMultithreading/entry/src/main/ets/workers/MyWorker1.ts) -->
162
1634. 在Worker线程中完成任务后,可以执行销毁操作。销毁方式有两种:一是在宿主线程中销毁Worker线程;二是在Worker线程中主动销毁。
164
165    在宿主线程中通过调用[onexit()](../reference/apis-arkts/js-apis-worker.md#属性-1)回调定义Worker线程销毁后的处理逻辑。
166
167    ```ts
168    // Index.ets
169    // Worker线程销毁后,执行onexit回调方法
170    workerInstance.onexit = (): void => {
171     console.info("main thread terminate");
172    }
173    ```
174
175    方式一:在宿主线程中通过调用[terminate()](../reference/apis-arkts/js-apis-worker.md#terminate9)方法销毁Worker线程,并终止Worker接收消息。
176
177    ```ts
178    // Index.ets
179    // 销毁Worker线程
180    workerInstance.terminate();
181    ```
182
183    方式二:在Worker线程中通过调用[close()](../reference/apis-arkts/js-apis-worker.md#close9)方法主动销毁Worker线程,并终止Worker接收消息。
184
185    ```ts
186    // MyWorker.ets
187    // 销毁线程
188    workerPort.close();
189    ```
190    <!-- @[after_destroy_callback](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/ApplicationMultithreadingDevelopment/ApplicationMultithreading/entry/src/main/ets/managers/CpuIntensiveTaskDevelopment.ets) -->