1# CPU密集型任务开发指导 (TaskPool和Worker) 2 3 4CPU密集型任务是指需要占用系统资源处理大量计算能力的任务,需要长时间运行,这段时间会阻塞线程其它事件的处理,不适宜放在主线程进行。例如图像处理、视频编码、数据分析等。 5 6 7基于多线程并发机制处理CPU密集型任务可以提高CPU利用率,提升应用程序响应速度。 8 9 10当任务不需要长时间(3分钟)占据后台线程,而是一个个独立的任务时,推荐使用TaskPool,反之推荐使用Worker。接下来将以图像直方图处理以及后台长时间的模型预测任务分别进行举例。 11 12 13## 使用TaskPool进行图像直方图处理 14 151. 实现图像处理的业务逻辑。 16 172. 数据分段,通过任务组发起关联任务调度。 18 创建[TaskGroup](../reference/apis-arkts/js-apis-taskpool.md#taskgroup10)并通过[addTask()](../reference/apis-arkts/js-apis-taskpool.md#addtask10)添加对应的任务,通过[execute()](../reference/apis-arkts/js-apis-taskpool.md#taskpoolexecute10)执行任务组,并指定为[高优先级](../reference/apis-arkts/js-apis-taskpool.md#priority),在当前任务组所有任务结束后,会将直方图处理结果同时返回。 19 203. 结果数组汇总处理。 21 22```ts 23import taskpool from '@ohos.taskpool'; 24 25@Concurrent 26function imageProcessing(dataSlice: ArrayBuffer): ArrayBuffer { 27 // 步骤1: 具体的图像处理操作及其他耗时操作 28 return dataSlice; 29} 30 31function histogramStatistic(pixelBuffer: ArrayBuffer): void { 32 // 步骤2: 分成三段并发调度 33 let number: number = pixelBuffer.byteLength / 3; 34 let buffer1: ArrayBuffer = pixelBuffer.slice(0, number); 35 let buffer2: ArrayBuffer = pixelBuffer.slice(number, number * 2); 36 let buffer3: ArrayBuffer = pixelBuffer.slice(number * 2); 37 38 let group: taskpool.TaskGroup = new taskpool.TaskGroup(); 39 group.addTask(imageProcessing, buffer1); 40 group.addTask(imageProcessing, buffer2); 41 group.addTask(imageProcessing, buffer3); 42 43 taskpool.execute(group, taskpool.Priority.HIGH).then((ret: Object) => { 44 // 步骤3: 结果数组汇总处理 45 }) 46} 47 48@Entry 49@Component 50struct Index { 51 @State message: string = 'Hello World' 52 53 build() { 54 Row() { 55 Column() { 56 Text(this.message) 57 .fontSize(50) 58 .fontWeight(FontWeight.Bold) 59 .onClick(() => { 60 let buffer: ArrayBuffer = new ArrayBuffer(24); 61 histogramStatistic(buffer); 62 }) 63 } 64 .width('100%') 65 } 66 .height('100%') 67 } 68} 69``` 70 71 72## 使用Worker进行长时间数据分析 73 74本文通过某地区提供的房价数据训练一个简易的房价预测模型,该模型支持通过输入房屋面积和房间数量去预测该区域的房价,模型需要长时间运行,房价预测需要使用前面的模型运行结果,因此需要使用Worker。 75 761. DevEco Studio提供了Worker创建的模板,新建一个Worker线程,例如命名为“MyWorker”。 77 78  79 802. 在主线程中通过调用ThreadWorker的[constructor()](../reference/apis-arkts/js-apis-worker.md#constructor9)方法创建Worker对象,当前线程为宿主线程。 81 82 ```ts 83 import worker from '@ohos.worker'; 84 85 const workerInstance: worker.ThreadWorker = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts'); 86 ``` 87 883. 在宿主线程中通过调用[onmessage()](../reference/apis-arkts/js-apis-worker.md#onmessage9)方法接收Worker线程发送过来的消息,并通过调用[postMessage()](../reference/apis-arkts/js-apis-worker.md#postmessage9)方法向Worker线程发送消息。 89 例如向Worker线程发送训练和预测的消息,同时接收Worker线程发送回来的消息。 90 91 ```ts 92 import worker from '@ohos.worker'; 93 94 const workerInstance: worker.ThreadWorker = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts'); 95 let done = false; 96 97 // 接收Worker子线程的结果 98 workerInstance.onmessage = (() => { 99 console.info('MyWorker.ts onmessage'); 100 if (!done) { 101 workerInstance.postMessage({ 'type': 1, 'value': 0 }); 102 done = true; 103 } 104 }) 105 106 workerInstance.onerror = (() => { 107 // 接收Worker子线程的错误信息 108 }) 109 110 // 向Worker子线程发送训练消息 111 workerInstance.postMessage({ 'type': 0 }); 112 ``` 113 1144. 在MyWorker.ts文件中绑定Worker对象,当前线程为Worker线程。 115 116 ```ts 117 import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker'; 118 119 let workerPort: ThreadWorkerGlobalScope = worker.workerPort; 120 ``` 121 1225. 在Worker线程中通过调用[onmessage()](../reference/apis-arkts/js-apis-worker.md#onmessage9-1)方法接收宿主线程发送的消息内容,并通过调用[postMessage()](../reference/apis-arkts/js-apis-worker.md#postmessage9-2)方法向宿主线程发送消息。 123 例如在Worker线程中定义预测模型及其训练过程,同时与主线程进行信息交互。 124 125 ```ts 126 import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker'; 127 let workerPort: ThreadWorkerGlobalScope = worker.workerPort; 128 // 定义训练模型及结果 129 let result: Array<number>; 130 // 定义预测函数 131 function predict(x: number): number { 132 return result[x]; 133 } 134 // 定义优化器训练过程 135 function optimize(): void { 136 result = [0]; 137 } 138 // Worker线程的onmessage逻辑 139 workerPort.onmessage = (e: MessageEvents): void => { 140 // 根据传输的数据的type选择进行操作 141 switch (e.data.type as number) { 142 case 0: 143 // 进行训练 144 optimize(); 145 // 训练之后发送主线程训练成功的消息 146 workerPort.postMessage({ type: 'message', value: 'train success.' }); 147 break; 148 case 1: 149 // 执行预测 150 const output: number = predict(e.data.value as number); 151 // 发送主线程预测的结果 152 workerPort.postMessage({ type: 'predict', value: output }); 153 break; 154 default: 155 workerPort.postMessage({ type: 'message', value: 'send message is invalid' }); 156 break; 157 } 158 } 159 ``` 160 1616. 在Worker线程中完成任务之后,执行Worker线程销毁操作。销毁线程的方式主要有两种:根据需要可以在宿主线程中对Worker线程进行销毁;也可以在Worker线程中主动销毁Worker线程。 162 163 在宿主线程中通过调用[onexit()](../reference/apis-arkts/js-apis-worker.md#onexit9)方法定义Worker线程销毁后的处理逻辑。 164 165 ```ts 166 // Worker线程销毁后,执行onexit回调方法 167 workerInstance.onexit = (): void => { 168 console.info("main thread terminate"); 169 } 170 ``` 171 172 方式一:在宿主线程中通过调用[terminate()](../reference/apis-arkts/js-apis-worker.md#terminate9)方法销毁Worker线程,并终止Worker接收息。 173 174 ```ts 175 // 销毁Worker线程 176 workerInstance.terminate(); 177 ``` 178 179 方式二:在Worker线程中通过调用[close()](../reference/apis-arkts/js-apis-worker.md#close9)方法主动销毁Worker线程,并终止Worker接收消息。 180 181 ```ts 182 // 销毁线程 183 workerPort.close(); 184 ```