• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# CPU Intensive Task Development (TaskPool and Worker)
2
3
4CPU intensive tasks are those that require significant computational resources and can run for extended periods. If executed in the UI main thread, these tasks can block other events. Examples include image processing, video encoding, and data analysis.
5
6
7To improve CPU utilization and enhance application responsiveness, you can use multithreaded concurrency in processing CPU intensive tasks.
8
9
10When tasks are discrete and do not need to occupy a background thread for an extended period (3 minutes), TaskPool is recommended. For tasks that require long-running background processing, Worker is more suitable.
11
12The following examples illustrate how to handle image histogram processing using TaskPool and long-running model prediction tasks using Worker.
13
14
15## Using TaskPool for Image Histogram Processing
16
171. Implement the logic of image processing.
18
192. Segment the data, and schedule related tasks using a TaskGroup.
20
21   Create a [task group](../reference/apis-arkts/js-apis-taskpool.md#taskgroup10), call [addTask()](../reference/apis-arkts/js-apis-taskpool.md#addtask10) to add tasks, and call [execute()](../reference/apis-arkts/js-apis-taskpool.md#taskpoolexecute10) to execute the tasks in the task group, specifying [high priority](../reference/apis-arkts/js-apis-taskpool.md#priority). After all the tasks in the group are complete, the histogram processing result is returned collectively.
22
233. Aggregate and process the result arrays.
24
25```ts
26import { taskpool } from '@kit.ArkTS';
27
28@Concurrent
29function imageProcessing(dataSlice: ArrayBuffer): ArrayBuffer {
30  // Step 1: Perform specific image processing operations and other time-consuming operations.
31  return dataSlice;
32}
33
34function histogramStatistic(pixelBuffer: ArrayBuffer): void {
35  // Step 2: Segment the data and schedule tasks concurrently.
36  let number: number = pixelBuffer.byteLength / 3;
37  let buffer1: ArrayBuffer = pixelBuffer.slice(0, number);
38  let buffer2: ArrayBuffer = pixelBuffer.slice(number, number * 2);
39  let buffer3: ArrayBuffer = pixelBuffer.slice(number * 2);
40
41  let group: taskpool.TaskGroup = new taskpool.TaskGroup();
42  group.addTask(imageProcessing, buffer1);
43  group.addTask(imageProcessing, buffer2);
44  group.addTask(imageProcessing, buffer3);
45
46  taskpool.execute(group, taskpool.Priority.HIGH).then((ret: Object) => {
47    // Step 3: Aggregate and process the result arrays.
48  })
49}
50
51@Entry
52@Component
53struct Index {
54  @State message: string = 'Hello World'
55
56  build() {
57    Row() {
58      Column() {
59        Text(this.message)
60          .fontSize(50)
61          .fontWeight(FontWeight.Bold)
62          .onClick(() => {
63            let buffer: ArrayBuffer = new ArrayBuffer(24);
64            histogramStatistic(buffer);
65          })
66      }
67      .width('100%')
68    }
69    .height('100%')
70  }
71}
72```
73<!-- @[process_image_histogram](https://gitee.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/ApplicationMultithreadingDevelopment/ApplicationMultithreading/entry/src/main/ets/managers/CpuIntensiveTaskDevelopment.ets) -->
74
75
76## Using Worker for Time-Consuming Data Analysis
77
78This example demonstrates training a simple housing price prediction model using housing data from a specific region. The model supports predicting housing prices based on input parameters like house size and number of rooms. Since the model requires long-running execution and the prediction relies on the model's previous results, Worker is the appropriate choice.
79
801. In DevEco Studio, add a Worker thread named **MyWorker** to your project.
81
82   ![newWorker](figures/newWorker.png)
83
842. In the host thread, call [constructor()](../reference/apis-arkts/js-apis-worker.md#constructor9) of **ThreadWorker** to create a Worker object.
85
86    ```ts
87    // Index.ets
88    import { worker } from '@kit.ArkTS';
89
90    const workerInstance: worker.ThreadWorker = new worker.ThreadWorker('entry/ets/workers/MyWorker.ets');
91    ```
92
933. In the host thread, call [onmessage()](../reference/apis-arkts/js-apis-worker.md#onmessage9) to receive messages from the Worker thread, and call [postMessage()](../reference/apis-arkts/js-apis-worker.md#postmessage9) to send messages to the Worker thread.
94
95   For example, the host thread sends training and prediction messages to the Worker thread and receive responses.
96
97    ```ts
98    // Index.ets
99    let done = false;
100
101    // Receive results from the Worker thread.
102    workerInstance.onmessage = (() => {
103      console.info('MyWorker.ets onmessage');
104      if (!done) {
105        workerInstance.postMessage({ 'type': 1, 'value': 0 });
106        done = true;
107      }
108    })
109
110    workerInstance.onAllErrors = (() => {
111      // Receive error messages from the Worker thread.
112    })
113
114    // Send a training message to the Worker thread.
115    workerInstance.postMessage({ 'type': 0 });
116    ```
117    <!-- @[call_worker_message](https://gitee.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/ApplicationMultithreadingDevelopment/ApplicationMultithreading/entry/src/main/ets/managers/CpuIntensiveTaskDevelopment.ets) -->
118
1194. Bind the Worker object in the **MyWorker.ets** file. The calling thread is the Worker thread.
120
121   ```ts
122   // MyWorker.ets
123   import { worker, ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@kit.ArkTS';
124
125   let workerPort: ThreadWorkerGlobalScope = worker.workerPort;
126   ```
127
1285. In the Worker thread, call [onmessage()](../reference/apis-arkts/js-apis-worker.md#onmessage9-1) to receive messages sent by the host thread, and call [postMessage()](../reference/apis-arkts/js-apis-worker.md#postmessage9-2) to send messages to the host thread.
129
130    For example, define the prediction model and training process in the Worker thread and interact with the host thread.
131
132    ```ts
133    // MyWorker.ets
134    // Define the training model and results.
135    let result: Array<number>;
136    // Define the prediction function.
137    function predict(x: number): number {
138     return result[x];
139    }
140    // Define the optimizer training process.
141    function optimize(): void {
142     result = [0];
143    }
144    // onmessage logic of the Worker thread.
145    workerPort.onmessage = (e: MessageEvents): void => {
146     // Perform operations based on the type of data to transmit.
147     switch (e.data.type as number) {
148      case 0:
149      // Perform training.
150       optimize();
151      // Send a training success message to the host thread after training.
152       workerPort.postMessage({ type: 'message', value: 'train success.' });
153       break;
154      case 1:
155      // Perform prediction.
156       const output: number = predict(e.data.value as number);
157      // Send the prediction result to the host thread.
158       workerPort.postMessage({ type: 'predict', value: output });
159       break;
160      default:
161       workerPort.postMessage({ type: 'message', value: 'send message is invalid' });
162       break;
163     }
164    }
165    ```
166    <!-- @[interact_main_thread](https://gitee.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/ApplicationMultithreadingDevelopment/ApplicationMultithreading/entry/src/main/ets/workers/MyWorker1.ts) -->
167
1686. After the task is completed, destroy the Worker thread. The Worker thread can be destroyed by itself or the host thread.
169
170    After the Worker thread is destroyed, call [onexit()](../reference/apis-arkts/js-apis-worker.md#onexit9) in the host thread to define the logic for handling the destruction.
171
172    ```ts
173    // Index.ets
174    // After the Worker thread is destroyed, execute the onexit callback.
175    workerInstance.onexit = (): void => {
176     console.info("main thread terminate");
177    }
178    ```
179
180    Method 1: In the host thread, call [terminate()](../reference/apis-arkts/js-apis-worker.md#terminate9) to destroy the Worker thread and stop it from receiving messages.
181
182    ```ts
183    // Index.ets
184    // Destroy the Worker thread.
185    workerInstance.terminate();
186    ```
187
188    Method 2: In the Worker thread, call [close()](../reference/apis-arkts/js-apis-worker.md#close9) to destroy the Worker thread and stop it from receiving messages.
189
190    ```ts
191    // MyWorker.ets
192    // Destroy the Worker thread.
193    workerPort.close();
194    ```
195    <!-- @[after_destroy_callback](https://gitee.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/ApplicationMultithreadingDevelopment/ApplicationMultithreading/entry/src/main/ets/managers/CpuIntensiveTaskDevelopment.ets) -->
196