• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# CPU密集型任务开发指导
2
3
4CPU密集型任务是指需要占用系统资源处理大量计算能力的任务,需要长时间运行,这段时间会阻塞线程其它事件的处理,不适宜放在主线程进行。例如图像处理、视频编码、数据分析等。
5
6
7基于多线程并发机制处理CPU密集型任务可以提高CPU利用率,提升应用程序响应速度。
8
9
10当进行一系列同步任务时,推荐使用Worker;而进行大量或调度点较为分散的独立任务时,不方便使用8个Worker去做负载管理,推荐采用TaskPool。接下来将以图像直方图处理以及后台长时间的模型预测任务分别进行举例。
11
12
13## 使用TaskPool进行图像直方图处理
14
151. 实现图像处理的业务逻辑。
16
172. 数据分段,将各段数据通过不同任务的执行完成图像处理。
18   创建[Task](../reference/apis/js-apis-taskpool.md#task),通过[execute()](../reference/apis/js-apis-taskpool.md#taskpoolexecute-1)执行任务,在当前任务结束后,会将直方图处理结果同时返回。
19
203. 结果处理。
21
22
23```ts
24import taskpool from '@ohos.taskpool';
25
26@Concurrent
27function imageProcessing(dataSlice: ArrayBuffer) {
28  // 步骤1: 具体的图像处理操作及其他耗时操作
29  return dataSlice;
30}
31
32function histogramStatistic(pixelBuffer: ArrayBuffer) {
33  // 步骤2: 分成三段并发调度
34  let number = pixelBuffer.byteLength / 3;
35  let buffer1 = pixelBuffer.slice(0, number);
36  let buffer2 = pixelBuffer.slice(number, number * 2);
37  let buffer3 = pixelBuffer.slice(number * 2);
38
39  let task1 = new taskpool.Task(imageProcessing, buffer1);
40  let task2 = new taskpool.Task(imageProcessing, buffer2);
41  let task3 = new taskpool.Task(imageProcessing, buffer3);
42
43  taskpool.execute(task1).then((ret: ArrayBuffer[]) => {
44    // 步骤3: 结果处理
45  });
46  taskpool.execute(task2).then((ret: ArrayBuffer[]) => {
47    // 步骤3: 结果处理
48  });
49  taskpool.execute(task3).then((ret: ArrayBuffer[]) => {
50    // 步骤3: 结果处理
51  });
52}
53
54@Entry
55@Component
56struct Index {
57  @State message: string = 'Hello World'
58
59  build() {
60    Row() {
61      Column() {
62        Text(this.message)
63          .fontSize(50)
64          .fontWeight(FontWeight.Bold)
65          .onClick(() => {
66            let data: ArrayBuffer;
67            histogramStatistic(data);
68          })
69      }
70      .width('100%')
71    }
72    .height('100%')
73  }
74}
75```
76
77
78## 使用Worker进行长时间数据分析
79
80本文通过某地区提供的房价数据训练一个简易的房价预测模型,该模型支持通过输入房屋面积和房间数量去预测该区域的房价,模型需要长时间运行,房价预测需要使用前面的模型运行结果,因此需要使用Worker。
81
821. DevEco Studio提供了Worker创建的模板,新建一个Worker线程,例如命名为“MyWorker”。
83
84   ![newWorker](figures/newWorker.png)
85
862. 在主线程中通过调用ThreadWorker的[constructor()](../reference/apis/js-apis-worker.md#constructor9)方法创建Worker对象,当前线程为宿主线程。
87
88   ```js
89   import worker from '@ohos.worker';
90
91   const workerInstance = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts');
92   ```
93
943. 在宿主线程中通过调用[onmessage()](../reference/apis/js-apis-worker.md#onmessage9)方法接收Worker线程发送过来的消息,并通过调用[postMessage()](../reference/apis/js-apis-worker.md#postmessage9)方法向Worker线程发送消息。
95   例如向Worker线程发送训练和预测的消息,同时接收Worker线程发送回来的消息。
96
97
98   ```js
99   // 接收Worker子线程的结果
100   workerInstance.onmessage = function(e) {
101     // data:主线程发送的信息
102     let data = e.data;
103     console.info('MyWorker.ts onmessage');
104     // 在Worker线程中进行耗时操作
105   }
106
107   workerInstance.onerror = function (d) {
108     // 接收Worker子线程的错误信息
109   }
110
111   // 向Worker子线程发送训练消息
112   workerInstance.postMessage({ 'type': 0 });
113   // 向Worker子线程发送预测消息
114   workerInstance.postMessage({ 'type': 1, 'value': [90, 5] });
115   ```
116
1174. 在MyWorker.ts文件中绑定Worker对象,当前线程为Worker线程。
118
119   ```js
120   import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker';
121
122   let workerPort: ThreadWorkerGlobalScope = worker.workerPort;
123   ```
124
1255. 在Worker线程中通过调用[onmessage()](../reference/apis/js-apis-worker.md#onmessage9-1)方法接收宿主线程发送的消息内容,并通过调用[postMessage()](../reference/apis/js-apis-worker.md#postmessage9-2)方法向宿主线程发送消息。
126   例如在Worker线程中定义预测模型及其训练过程,同时与主线程进行信息交互。
127
128
129   ```js
130   import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker';
131
132   let workerPort: ThreadWorkerGlobalScope = worker.workerPort;
133
134   // 定义训练模型及结果
135   let result;
136
137   // 定义预测函数
138   function predict(x) {
139     return result[x];
140   }
141
142   // 定义优化器训练过程
143   function optimize() {
144     result = {};
145   }
146
147   // Worker线程的onmessage逻辑
148   workerPort.onmessage = function (e: MessageEvents) {
149     let data = e.data
150     // 根据传输的数据的type选择进行操作
151     switch (data.type) {
152       case 0:
153       // 进行训练
154         optimize();
155       // 训练之后发送主线程训练成功的消息
156         workerPort.postMessage({ type: 'message', value: 'train success.' });
157         break;
158       case 1:
159       // 执行预测
160         const output = predict(data.value);
161       // 发送主线程预测的结果
162         workerPort.postMessage({ type: 'predict', value: output });
163         break;
164       default:
165         workerPort.postMessage({ type: 'message', value: 'send message is invalid' });
166         break;
167     }
168   }
169   ```
170
1716. 在Worker线程中完成任务之后,执行Worker线程销毁操作。销毁线程的方式主要有两种:根据需要可以在宿主线程中对Worker线程进行销毁;也可以在Worker线程中主动销毁Worker线程。
172
173   在宿主线程中通过调用[onexit()](../reference/apis/js-apis-worker.md#onexit9)方法定义Worker线程销毁后的处理逻辑。
174
175   ```js
176   // Worker线程销毁后,执行onexit回调方法
177   workerInstance.onexit = function() {
178     console.info("main thread terminate");
179   }
180   ```
181
182   方式一:在宿主线程中通过调用[terminate()](../reference/apis/js-apis-worker.md#terminate9)方法销毁Worker线程,并终止Worker接收消息。
183
184   ```js
185   // 销毁Worker线程
186   workerInstance.terminate();
187   ```
188
189   方式二:在Worker线程中通过调用[close()](../reference/apis/js-apis-worker.md#close9)方法主动销毁Worker线程,并终止Worker接收消息。
190
191   ```js
192   // 销毁线程
193   workerPort.close();
194   ```
195