1# CPU密集型任务开发指导 (TaskPool和Worker) 2<!--Kit: ArkTS--> 3<!--Subsystem: CommonLibrary--> 4<!--Owner: @lijiamin2025--> 5<!--Designer: @weng-changcheng--> 6<!--Tester: @kirl75; @zsw_zhushiwei--> 7<!--Adviser: @ge-yafang--> 8 9 10CPU密集型任务是指需要占用系统资源进行大量计算的任务,这类任务需要长时间运行,会阻塞线程中其他事件的处理,因此不适合在UI主线程中执行。例如图像处理、视频编码、数据分析等。 11 12 13基于多线程并发机制处理CPU密集型任务可以提高CPU利用率,提升应用程序响应速度。 14 15 16当任务不需要长时间(3分钟)占用后台线程,而是一个个独立的任务时,推荐使用TaskPool,反之推荐使用Worker。 17 18接下来将分别以图像直方图处理和后台长时间模型预测任务为例进行说明。 19 20 21## 使用TaskPool进行图像直方图处理 22 231. 实现图像处理的业务逻辑。 24 252. 对数据进行分段,并通过任务组发起关联任务调度。 26 创建[TaskGroup](../reference/apis-arkts/js-apis-taskpool.md#taskgroup10),通过[addTask()](../reference/apis-arkts/js-apis-taskpool.md#addtask10)添加对应的任务,然后通过[execute()](../reference/apis-arkts/js-apis-taskpool.md#taskpoolexecute10)执行任务组,并指定为[高优先级](../reference/apis-arkts/js-apis-taskpool.md#priority)。在当前任务组所有任务结束后,会将直方图处理结果同时返回。 27 283. 汇总处理结果数组。 29 30```ts 31// Index.ets 32import { taskpool } from '@kit.ArkTS'; 33 34@Concurrent 35function imageProcessing(dataSlice: ArrayBuffer): ArrayBuffer { 36 // 步骤1: 具体的图像处理操作及其他耗时操作 37 return dataSlice; 38} 39 40function histogramStatistic(pixelBuffer: ArrayBuffer): void { 41 // 步骤2: 分成三段并发调度 42 let number: number = pixelBuffer.byteLength / 3; 43 let buffer1: ArrayBuffer = pixelBuffer.slice(0, number); 44 let buffer2: ArrayBuffer = pixelBuffer.slice(number, number * 2); 45 let buffer3: ArrayBuffer = pixelBuffer.slice(number * 2); 46 47 let group: taskpool.TaskGroup = new taskpool.TaskGroup(); 48 group.addTask(imageProcessing, buffer1); 49 group.addTask(imageProcessing, buffer2); 50 group.addTask(imageProcessing, buffer3); 51 52 taskpool.execute(group, taskpool.Priority.HIGH).then((ret: Object) => { 53 // 步骤3: 结果数组汇总处理 54 console.info('execute group success'); 55 }) 56} 57 58@Entry 59@Component 60struct Index { 61 @State message: string = 'Hello World' 62 63 build() { 64 Row() { 65 Column() { 66 Text(this.message) 67 .fontSize(50) 68 .fontWeight(FontWeight.Bold) 69 .onClick(() => { 70 let buffer: ArrayBuffer = new ArrayBuffer(24); 71 histogramStatistic(buffer); 72 }) 73 } 74 .width('100%') 75 } 76 .height('100%') 77 } 78} 79``` 80<!-- @[process_image_histogram](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/ApplicationMultithreadingDevelopment/ApplicationMultithreading/entry/src/main/ets/managers/CpuIntensiveTaskDevelopment.ets) --> 81 82 83## 使用Worker进行长时间数据分析 84 85本文通过某地区提供的房价数据训练一个简易的房价预测模型,该模型支持通过输入房屋面积和房间数量去预测该区域的房价,模型需要长时间运行,房价预测需要使用前面的模型运行结果,因此需要使用Worker。 86 871. DevEco Studio提供了Worker创建的模板,创建一个Worker线程,例如命名为“MyWorker”。 88 89  90 912. 在宿主线程中首先调用ThreadWorker的[constructor()](../reference/apis-arkts/js-apis-worker.md#constructor9)方法创建Worker对象;然后通过注册[onmessage()](../reference/apis-arkts/js-apis-worker.md#属性-1)回调接收Worker线程发送过来的消息;最后通过调用[postMessage()](../reference/apis-arkts/js-apis-worker.md#postmessage9)方法向Worker线程发送消息。 92 例如,向Worker线程发送训练和预测的消息,并接收Worker线程发送回来的消息。 93 94 ```ts 95 // Index.ets 96 import { ErrorEvent, MessageEvents, worker } from '@kit.ArkTS'; 97 98 const workerInstance: worker.ThreadWorker = new worker.ThreadWorker('entry/ets/workers/MyWorker.ets'); 99 100 let done = false; 101 102 // 接收Worker子线程的结果 103 workerInstance.onmessage = (event: MessageEvents) => { 104 console.info('MyWorker.ets onmessage'); 105 if (!done) { 106 workerInstance.postMessage({ 'type': 1, 'value': 0 }); 107 done = true; 108 } 109 } 110 111 workerInstance.onAllErrors = (err: ErrorEvent) => { 112 // 接收Worker子线程的错误信息 113 } 114 115 // 向Worker子线程发送训练消息 116 workerInstance.postMessage({ 'type': 0 }); 117 ``` 118 <!-- @[call_worker_message](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/ApplicationMultithreadingDevelopment/ApplicationMultithreading/entry/src/main/ets/managers/CpuIntensiveTaskDevelopment.ets) --> 119 1203. 在MyWorker.ets文件中绑定Worker对象,当前线程即为Worker线程。在Worker线程中通过注册[onmessage()](../reference/apis-arkts/js-apis-worker.md#属性-2)回调接收宿主线程发送的消息,并通过调用[postMessage()](../reference/apis-arkts/js-apis-worker.md#postmessage9-2)方法向宿主线程发送消息。 121 例如,在Worker线程中定义预测模型及其训练过程,并与宿主线程进行信息交互。 122 123 ```ts 124 // MyWorker.ets 125 import { worker, ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@kit.ArkTS'; 126 127 let workerPort: ThreadWorkerGlobalScope = worker.workerPort; 128 129 // 定义训练模型及结果 130 let result: Array<number>; 131 // 定义预测函数 132 function predict(x: number): number { 133 return result[x]; 134 } 135 // 定义优化器训练过程 136 function optimize(): void { 137 result = [0]; 138 } 139 // Worker线程的onmessage逻辑 140 workerPort.onmessage = (e: MessageEvents): void => { 141 // 根据传输的数据的type选择进行操作 142 switch (e.data.type as number) { 143 case 0: 144 // 进行训练 145 optimize(); 146 // 训练之后发送宿主线程训练成功的消息 147 workerPort.postMessage({ type: 'message', value: 'train success.' }); 148 break; 149 case 1: 150 // 执行预测 151 const output: number = predict(e.data.value as number); 152 // 发送宿主线程预测的结果 153 workerPort.postMessage({ type: 'predict', value: output }); 154 break; 155 default: 156 workerPort.postMessage({ type: 'message', value: 'send message is invalid' }); 157 break; 158 } 159 } 160 ``` 161 <!-- @[interact_main_thread](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/ApplicationMultithreadingDevelopment/ApplicationMultithreading/entry/src/main/ets/workers/MyWorker1.ts) --> 162 1634. 在Worker线程中完成任务后,可以执行销毁操作。销毁方式有两种:一是在宿主线程中销毁Worker线程;二是在Worker线程中主动销毁。 164 165 在宿主线程中通过调用[onexit()](../reference/apis-arkts/js-apis-worker.md#属性-1)回调定义Worker线程销毁后的处理逻辑。 166 167 ```ts 168 // Index.ets 169 // Worker线程销毁后,执行onexit回调方法 170 workerInstance.onexit = (): void => { 171 console.info("main thread terminate"); 172 } 173 ``` 174 175 方式一:在宿主线程中通过调用[terminate()](../reference/apis-arkts/js-apis-worker.md#terminate9)方法销毁Worker线程,并终止Worker接收消息。 176 177 ```ts 178 // Index.ets 179 // 销毁Worker线程 180 workerInstance.terminate(); 181 ``` 182 183 方式二:在Worker线程中通过调用[close()](../reference/apis-arkts/js-apis-worker.md#close9)方法主动销毁Worker线程,并终止Worker接收消息。 184 185 ```ts 186 // MyWorker.ets 187 // 销毁线程 188 workerPort.close(); 189 ``` 190 <!-- @[after_destroy_callback](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/ApplicationMultithreadingDevelopment/ApplicationMultithreading/entry/src/main/ets/managers/CpuIntensiveTaskDevelopment.ets) -->