1# 多级Worker间高性能消息通信 2 3多级[Worker](worker-introduction.md)(即通过父Worker创建子Worker的机制形成层级线程关系)间通信是一种常见的需求,由于Worker线程生命周期由用户自行管理,因此需要注意多级Worker生命周期的正确管理,建议开发者确保销毁父Worker前先销毁所有子Worker。 4 5本文介绍如何在多级Worker间实现高性能消息通信,高性能消息通信的关键在于[Sendable对象](arkts-sendable.md),结合Worker的[postMessageWithSharedSendable接口](../reference/apis-arkts/js-apis-worker.md#postmessagewithsharedsendable12),可以实现线程间高性能的对象传递。以数据克隆场景为例,假设有三个Worker,一个父Worker和两个子Worker,父Worker负责创建子Worker,并向子Worker发送数据克隆任务,子Worker负责接收任务并执行数据克隆操作,完成后将克隆结果返回给父Worker。 6 71. 准备一个Sendable类CopyEntry,用于封装克隆任务数据。 8 9 ```ts 10 // CopyEntry.ets 11 @Sendable 12 export class CopyEntry { 13 // 克隆类型 14 type: string; 15 // 文件路径 16 filePath: string; 17 constructor(type: string, filePath: string) { 18 this.type = type; 19 this.filePath = filePath; 20 } 21 } 22 ``` 23 242. 准备两个Worker文件,父Worker文件为`ParentWorker.ets`,子Worker文件为`ChildWorker.ets`。父Worker负责分发克隆任务并判断任务全部完成后关闭子Worker与父Worker;子Worker负责接收任务并执行数据克隆操作,并在任务完成后通知父Worker。 25 26 ```ts 27 // ParentWorker.ets 28 import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker, collections, ArkTSUtils } from '@kit.ArkTS' 29 import { CopyEntry } from './CopyEntry' 30 31 const workerPort: ThreadWorkerGlobalScope = worker.workerPort; 32 33 // 计算worker1的任务数量 34 let count1 = 0; 35 // 计算worker2的任务数量 36 let count2 = 0; 37 // 计算总任务数量 38 let sum = 0; 39 // 异步锁 40 const asyncLock = new ArkTSUtils.locks.AsyncLock(); 41 // 创建子Worker 42 const copyWorker1 = new worker.ThreadWorker('entry/ets/pages/ChildWorker'); 43 const copyWorker2 = new worker.ThreadWorker('entry/ets/pages/ChildWorker'); 44 45 workerPort.onmessage = (e : MessageEvents) => { 46 let array = e.data as collections.Array<CopyEntry>; 47 sum = array.length; 48 for (let i = 0; i < array.length; i++) { 49 let entry = array[i]; 50 if (entry.type === 'copy1') { 51 count1++; 52 // 如果是copy1类型,则将数据传递给 copyWorker1 53 copyWorker1.postMessageWithSharedSendable(entry); 54 } else if (entry.type === 'copy2') { 55 count2++; 56 // 如果是copy2类型,则将数据传递给 copyWorker2 57 copyWorker2.postMessageWithSharedSendable(entry); 58 } 59 } 60 } 61 62 copyWorker1.onmessage = async (e : MessageEvents) => { 63 console.info('copyWorker1 onmessage:' + e.data); 64 await asyncLock.lockAsync(() => { 65 count1--; 66 if (count1 == 0) { 67 // 如果copyWorker1的任务全部完成,则关闭copyWorker1 68 console.info('copyWorker1 close'); 69 copyWorker1.terminate(); 70 } 71 sum--; 72 if (sum == 0) { 73 // 如果所有任务全部完成,则关闭父Worker 74 workerPort.close(); 75 } 76 }) 77 } 78 79 copyWorker2.onmessage = async (e : MessageEvents) => { 80 console.info('copyWorker2 onmessage:' + e.data); 81 await asyncLock.lockAsync(() => { 82 count2--; 83 sum--; 84 if (count2 == 0) { 85 // 如果copyWorker2的任务全部完成,则关闭copyWorker2 86 console.info('copyWorker2 close') 87 copyWorker2.terminate(); 88 } 89 if (sum == 0) { 90 // 如果所有任务全部完成,则关闭父Worker 91 workerPort.close(); 92 } 93 }) 94 } 95 96 workerPort.onmessageerror = (e : MessageEvents) => { 97 console.info('onmessageerror:' + e.data); 98 } 99 100 workerPort.onerror = (e : ErrorEvent) => { 101 console.info('onerror:' + e.message); 102 } 103 ``` 104 ```ts 105 // ChildWorker.ets 106 import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker} from '@kit.ArkTS' 107 import { CopyEntry } from './CopyEntry' 108 109 const workerPort: ThreadWorkerGlobalScope = worker.workerPort; 110 111 workerPort.onmessage = (e : MessageEvents) => { 112 let data = e.data as CopyEntry; 113 // 中间copy操作省略 114 console.info(data.filePath); 115 workerPort.postMessageWithSharedSendable("done"); 116 } 117 118 workerPort.onmessageerror = (e : MessageEvents) => { 119 console.info('onmessageerror:' + e.data); 120 } 121 122 workerPort.onerror = (e : ErrorEvent) => { 123 console.info('onerror:' + e.message); 124 } 125 ``` 126 1273. 在UI主进程页面,创建父Worker并准备克隆任务所需的数据,准备完成后将数据发送给父Worker。 128 129 ```ts 130 // Index.ets 131 import { worker, collections } from '@kit.ArkTS'; 132 import { BusinessError } from '@kit.BasicServicesKit'; 133 import { CopyEntry } from './CopyEntry' 134 135 function promiseCase() { 136 let p: Promise<void> = new Promise<void>((resolve: Function, reject: Function) => { 137 setTimeout(() => { 138 resolve(1); 139 }, 100) 140 }).then(undefined, (error: BusinessError) => { 141 }) 142 return p; 143 } 144 145 async function postMessageTest() { 146 let ss = new worker.ThreadWorker("entry/ets/pages/ParentWorker"); 147 let isTerminate = false; 148 ss.onexit = () => { 149 isTerminate = true; 150 } 151 let array = new collections.Array<CopyEntry>(); 152 // 准备数据 153 for (let i = 0; i < 4; i++) { 154 if (i % 2 == 0) { 155 array.push(new CopyEntry("copy1", "file://copy1.txt")); 156 } else { 157 array.push(new CopyEntry("copy2", "file://copy2.txt")); 158 } 159 } 160 // 给Worker线程发送消息 161 ss.postMessageWithSharedSendable(array); 162 while (!isTerminate) { 163 await promiseCase(); 164 } 165 console.info("Worker线程已退出"); 166 } 167 168 @Entry 169 @Component 170 struct Index { 171 @State message: string = 'Hello World'; 172 build() { 173 Row() { 174 Column() { 175 Text(this.message) 176 .fontSize(50) 177 .fontWeight(FontWeight.Bold) 178 .onClick(() => { 179 postMessageTest(); 180 }) 181 } 182 .width('100%') 183 } 184 .height('100%') 185 } 186 } 187 ```