• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# CPU Intensive Task Development (TaskPool and Worker)
2
3
4CPU intensive tasks are tasks that occupy a significant amount of system computing resources and that may block other tasks in the same thread. Example CPU intensive tasks are image processing, video encoding, and data analysis.
5
6
7To improve CPU utilization and application response speeds, use multithread concurrency in processing CPU intensive tasks.
8
9
10If a task can be completed in a background thread within 3 minutes, you are advised to use TaskPool. Otherwise, use Worker. The following uses histogram processing and a time-consuming model prediction task in the background as examples.
11
12
13## Using TaskPool to Process Histograms
14
151. Implement the logic of image processing.
16
172. Segment the data, and initiate associated task scheduling through task groups.
18
19   Create a [task group](../reference/apis-arkts/js-apis-taskpool.md#taskgroup10), call [addTask()](../reference/apis-arkts/js-apis-taskpool.md#addtask10) to add tasks, and call [execute()](../reference/apis-arkts/js-apis-taskpool.md#taskpoolexecute10) to execute the tasks in the task group at a [a high priority](../reference/apis-arkts/js-apis-taskpool.md#priority). After all the tasks in the task group are complete, the histogram processing result is returned simultaneously.
20
213. Summarize and process the result arrays.
22
23```ts
24import taskpool from '@ohos.taskpool';
25
26@Concurrent
27function imageProcessing(dataSlice: ArrayBuffer): ArrayBuffer {
28  // Step 1: Perform specific image processing operations and other time-consuming operations.
29  return dataSlice;
30}
31
32function histogramStatistic(pixelBuffer: ArrayBuffer): void {
33  // Step 2: Perform concurrent scheduling for data in three segments.
34  let number: number = pixelBuffer.byteLength / 3;
35  let buffer1: ArrayBuffer = pixelBuffer.slice(0, number);
36  let buffer2: ArrayBuffer = pixelBuffer.slice(number, number * 2);
37  let buffer3: ArrayBuffer = pixelBuffer.slice(number * 2);
38
39  let group: taskpool.TaskGroup = new taskpool.TaskGroup();
40  group.addTask(imageProcessing, buffer1);
41  group.addTask(imageProcessing, buffer2);
42  group.addTask(imageProcessing, buffer3);
43
44  taskpool.execute(group, taskpool.Priority.HIGH).then((ret: Object) => {
45    // Step 3: Summarize and process the result arrays.
46  })
47}
48
49@Entry
50@Component
51struct Index {
52  @State message: string = 'Hello World'
53
54  build() {
55    Row() {
56      Column() {
57        Text(this.message)
58          .fontSize(50)
59          .fontWeight(FontWeight.Bold)
60          .onClick(() => {
61            let buffer: ArrayBuffer = new ArrayBuffer(24);
62            histogramStatistic(buffer);
63          })
64      }
65      .width('100%')
66    }
67    .height('100%')
68  }
69}
70```
71
72
73## Using Worker for Time-Consuming Model Prediction
74
75The following uses the training of a region-specific house price prediction model as an example. This model can be used to predict house prices in the region based on the house area and number of rooms. The model needs to run for a long time, and each prediction needs to use the previous running result. Due to these considerations, Worker is used for the development.
76
771. In DevEco Studio, add a worker named **MyWorker** to your project.
78
79   ![newWorker](figures/newWorker.png)
80
812. In the main thread, call [constructor()](../reference/apis-arkts/js-apis-worker.md#constructor9) of **ThreadWorker** to create a **Worker** object. The calling thread is the host thread.
82
83    ```ts
84    import worker from '@ohos.worker';
85
86    const workerInstance: worker.ThreadWorker = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts');
87    ```
88
893. In the host thread, call [onmessage()](../reference/apis-arkts/js-apis-worker.md#onmessage9) to receive messages from the worker thread, and call [postMessage()](../reference/apis-arkts/js-apis-worker.md#postmessage9) to send messages to the worker thread.
90
91   For example, the host thread sends training and prediction messages to the worker thread, and receives messages sent back by the worker thread.
92
93    ```ts
94    import worker  from '@ohos.worker';
95
96    const workerInstance: worker.ThreadWorker = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts');
97    let done = false;
98
99    // Receive the result from the worker thread.
100    workerInstance.onmessage = (() => {
101      console.info('MyWorker.ts onmessage');
102      if (!done) {
103        workerInstance.postMessage({ 'type': 1, 'value': 0 });
104        done = true;
105      }
106    })
107
108    workerInstance.onerror = (() => {
109      // Receive error information from the worker thread.
110    })
111
112    // Send a training message to the worker thread.
113    workerInstance.postMessage({ 'type': 0 });
114    ```
115
1164. Bind the **Worker** object in the **MyWorker.ts** file. The calling thread is the worker thread.
117
118   ```ts
119   import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker';
120
121   let workerPort: ThreadWorkerGlobalScope = worker.workerPort;
122   ```
123
1245. In the worker thread, call [onmessage()](../reference/apis-arkts/js-apis-worker.md#onmessage9-1) to receive messages sent by the host thread, and call [postMessage()](../reference/apis-arkts/js-apis-worker.md#postmessage9-2) to send messages to the host thread.
125
126    For example, the prediction model and its training process are defined in the worker thread, and messages are exchanged with the main thread.
127
128    ```ts
129    import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker';
130    let workerPort: ThreadWorkerGlobalScope = worker.workerPort;
131    // Define the training model and result.
132    let result: Array<number>;
133    // Define the prediction function.
134    function predict(x: number): number {
135     return result[x];
136    }
137    // Define the optimizer training process.
138    function optimize(): void {
139     result = [0];
140    }
141    // onmessage logic of the worker thread.
142    workerPort.onmessage = (e: MessageEvents): void => {
143     // Perform operations based on the type of data to transmit.
144     switch (e.data.type as number) {
145      case 0:
146      // Perform training.
147       optimize();
148      // Send a training success message to the main thread after training is complete.
149       workerPort.postMessage({ type: 'message', value: 'train success.' });
150       break;
151      case 1:
152      // Execute the prediction.
153       const output: number = predict(e.data.value as number);
154      // Send the prediction result to the main thread.
155       workerPort.postMessage({ type: 'predict', value: output });
156       break;
157      default:
158       workerPort.postMessage({ type: 'message', value: 'send message is invalid' });
159       break;
160     }
161    }
162    ```
163
1646. After the task is completed in the worker thread, destroy the worker thread. The worker thread can be destroyed by itself or the host thread. Then, call [onexit()](../reference/apis-arkts/js-apis-worker.md#onexit9) in the host thread to define the processing logic after the worker thread is destroyed.
165
166    ```ts
167// After the worker thread is destroyed, execute the onexit() callback.
168    workerInstance.onexit = (): void => {
169     console.info("main thread terminate");
170    }
171    ```
172
173    Method 1: In the host thread, call [terminate()](../reference/apis-arkts/js-apis-worker.md#terminate9) to destroy the worker thread and stop the worker thread from receiving messages.
174
175    ```ts
176// Destroy the worker thread.
177    workerInstance.terminate();
178    ```
179
180    Method 2: In the worker thread, call [close()](../reference/apis-arkts/js-apis-worker.md#close9) to destroy the worker thread and stop the worker thread from receiving messages.
181
182    ```ts
183// Destroy the worker thread.
184    workerPort.close();
185    ```
186