1# CPU密集型任务开发指导 2 3 4CPU密集型任务是指需要占用系统资源处理大量计算能力的任务,需要长时间运行,这段时间会阻塞线程其它事件的处理,不适宜放在主线程进行。例如图像处理、视频编码、数据分析等。 5 6 7基于多线程并发机制处理CPU密集型任务可以提高CPU利用率,提升应用程序响应速度。 8 9 10当进行一系列同步任务时,推荐使用Worker;而进行大量或调度点较为分散的独立任务时,不方便使用8个Worker去做负载管理,推荐采用TaskPool。接下来将以图像直方图处理以及后台长时间的模型预测任务分别进行举例。 11 12 13## 使用TaskPool进行图像直方图处理 14 151. 实现图像处理的业务逻辑。 16 172. 数据分段,将各段数据通过不同任务的执行完成图像处理。 18 创建[Task](../reference/apis/js-apis-taskpool.md#task),通过[execute()](../reference/apis/js-apis-taskpool.md#taskpoolexecute-1)执行任务,在当前任务结束后,会将直方图处理结果同时返回。 19 203. 结果处理。 21 22 23```ts 24import taskpool from '@ohos.taskpool'; 25 26@Concurrent 27function imageProcessing(dataSlice: ArrayBuffer) { 28 // 步骤1: 具体的图像处理操作及其他耗时操作 29 return dataSlice; 30} 31 32function histogramStatistic(pixelBuffer: ArrayBuffer) { 33 // 步骤2: 分成三段并发调度 34 let number = pixelBuffer.byteLength / 3; 35 let buffer1 = pixelBuffer.slice(0, number); 36 let buffer2 = pixelBuffer.slice(number, number * 2); 37 let buffer3 = pixelBuffer.slice(number * 2); 38 39 let task1 = new taskpool.Task(imageProcessing, buffer1); 40 let task2 = new taskpool.Task(imageProcessing, buffer2); 41 let task3 = new taskpool.Task(imageProcessing, buffer3); 42 43 taskpool.execute(task1).then((ret: ArrayBuffer[]) => { 44 // 步骤3: 结果处理 45 }); 46 taskpool.execute(task2).then((ret: ArrayBuffer[]) => { 47 // 步骤3: 结果处理 48 }); 49 taskpool.execute(task3).then((ret: ArrayBuffer[]) => { 50 // 步骤3: 结果处理 51 }); 52} 53 54@Entry 55@Component 56struct Index { 57 @State message: string = 'Hello World' 58 59 build() { 60 Row() { 61 Column() { 62 Text(this.message) 63 .fontSize(50) 64 .fontWeight(FontWeight.Bold) 65 .onClick(() => { 66 let data: ArrayBuffer; 67 histogramStatistic(data); 68 }) 69 } 70 .width('100%') 71 } 72 .height('100%') 73 } 74} 75``` 76 77 78## 使用Worker进行长时间数据分析 79 80本文通过某地区提供的房价数据训练一个简易的房价预测模型,该模型支持通过输入房屋面积和房间数量去预测该区域的房价,模型需要长时间运行,房价预测需要使用前面的模型运行结果,因此需要使用Worker。 81 821. DevEco Studio提供了Worker创建的模板,新建一个Worker线程,例如命名为“MyWorker”。 83 84  85 862. 在主线程中通过调用ThreadWorker的[constructor()](../reference/apis/js-apis-worker.md#constructor9)方法创建Worker对象,当前线程为宿主线程。 87 88 ```js 89 import worker from '@ohos.worker'; 90 91 const workerInstance = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts'); 92 ``` 93 943. 在宿主线程中通过调用[onmessage()](../reference/apis/js-apis-worker.md#onmessage9)方法接收Worker线程发送过来的消息,并通过调用[postMessage()](../reference/apis/js-apis-worker.md#postmessage9)方法向Worker线程发送消息。 95 例如向Worker线程发送训练和预测的消息,同时接收Worker线程发送回来的消息。 96 97 98 ```js 99 // 接收Worker子线程的结果 100 workerInstance.onmessage = function(e) { 101 // data:主线程发送的信息 102 let data = e.data; 103 console.info('MyWorker.ts onmessage'); 104 // 在Worker线程中进行耗时操作 105 } 106 107 workerInstance.onerror = function (d) { 108 // 接收Worker子线程的错误信息 109 } 110 111 // 向Worker子线程发送训练消息 112 workerInstance.postMessage({ 'type': 0 }); 113 // 向Worker子线程发送预测消息 114 workerInstance.postMessage({ 'type': 1, 'value': [90, 5] }); 115 ``` 116 1174. 在MyWorker.ts文件中绑定Worker对象,当前线程为Worker线程。 118 119 ```js 120 import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker'; 121 122 let workerPort: ThreadWorkerGlobalScope = worker.workerPort; 123 ``` 124 1255. 在Worker线程中通过调用[onmessage()](../reference/apis/js-apis-worker.md#onmessage9-1)方法接收宿主线程发送的消息内容,并通过调用[postMessage()](../reference/apis/js-apis-worker.md#postmessage9-2)方法向宿主线程发送消息。 126 例如在Worker线程中定义预测模型及其训练过程,同时与主线程进行信息交互。 127 128 129 ```js 130 import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker'; 131 132 let workerPort: ThreadWorkerGlobalScope = worker.workerPort; 133 134 // 定义训练模型及结果 135 let result; 136 137 // 定义预测函数 138 function predict(x) { 139 return result[x]; 140 } 141 142 // 定义优化器训练过程 143 function optimize() { 144 result = {}; 145 } 146 147 // Worker线程的onmessage逻辑 148 workerPort.onmessage = function (e: MessageEvents) { 149 let data = e.data 150 // 根据传输的数据的type选择进行操作 151 switch (data.type) { 152 case 0: 153 // 进行训练 154 optimize(); 155 // 训练之后发送主线程训练成功的消息 156 workerPort.postMessage({ type: 'message', value: 'train success.' }); 157 break; 158 case 1: 159 // 执行预测 160 const output = predict(data.value); 161 // 发送主线程预测的结果 162 workerPort.postMessage({ type: 'predict', value: output }); 163 break; 164 default: 165 workerPort.postMessage({ type: 'message', value: 'send message is invalid' }); 166 break; 167 } 168 } 169 ``` 170 1716. 在Worker线程中完成任务之后,执行Worker线程销毁操作。销毁线程的方式主要有两种:根据需要可以在宿主线程中对Worker线程进行销毁;也可以在Worker线程中主动销毁Worker线程。 172 173 在宿主线程中通过调用[onexit()](../reference/apis/js-apis-worker.md#onexit9)方法定义Worker线程销毁后的处理逻辑。 174 175 ```js 176 // Worker线程销毁后,执行onexit回调方法 177 workerInstance.onexit = function() { 178 console.info("main thread terminate"); 179 } 180 ``` 181 182 方式一:在宿主线程中通过调用[terminate()](../reference/apis/js-apis-worker.md#terminate9)方法销毁Worker线程,并终止Worker接收消息。 183 184 ```js 185 // 销毁Worker线程 186 workerInstance.terminate(); 187 ``` 188 189 方式二:在Worker线程中通过调用[close()](../reference/apis/js-apis-worker.md#close9)方法主动销毁Worker线程,并终止Worker接收消息。 190 191 ```js 192 // 销毁线程 193 workerPort.close(); 194 ``` 195