• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# 多级Worker间高性能消息通信
2
3多级[Worker](worker-introduction.md)(即通过父Worker创建子Worker的机制形成层级线程关系)间通信是一种常见的需求,由于Worker线程生命周期由用户自行管理,因此需要注意多级Worker生命周期的正确管理,建议开发者确保销毁父Worker前先销毁所有子Worker。
4
5本文介绍如何在多级Worker间实现高性能消息通信,高性能消息通信的关键在于[Sendable对象](arkts-sendable.md),结合Worker的[postMessageWithSharedSendable接口](../reference/apis-arkts/js-apis-worker.md#postmessagewithsharedsendable12),可以实现线程间高性能的对象传递。以数据克隆场景为例,假设有三个Worker,一个父Worker和两个子Worker,父Worker负责创建子Worker,并向子Worker发送数据克隆任务,子Worker负责接收任务并执行数据克隆操作,完成后将克隆结果返回给父Worker。
6
71. 准备一个Sendable类CopyEntry,用于封装克隆任务数据。
8
9   ```ts
10   // CopyEntry.ets
11   @Sendable
12   export class CopyEntry {
13     // 克隆类型
14     type: string;
15     // 文件路径
16     filePath: string;
17     constructor(type: string, filePath: string) {
18       this.type = type;
19       this.filePath = filePath;
20     }
21   }
22   ```
23
242. 准备两个Worker文件,父Worker文件为`ParentWorker.ets`,子Worker文件为`ChildWorker.ets`。父Worker负责分发克隆任务并判断任务全部完成后关闭子Worker与父Worker;子Worker负责接收任务并执行数据克隆操作,并在任务完成后通知父Worker。
25
26   ```ts
27   // ParentWorker.ets
28   import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker, collections, ArkTSUtils } from '@kit.ArkTS'
29   import { CopyEntry } from './CopyEntry'
30
31   const workerPort: ThreadWorkerGlobalScope = worker.workerPort;
32
33   // 计算worker1的任务数量
34   let count1 = 0;
35   // 计算worker2的任务数量
36   let count2 = 0;
37   // 计算总任务数量
38   let sum = 0;
39   // 异步锁
40   const asyncLock = new ArkTSUtils.locks.AsyncLock();
41   // 创建子Worker
42   const copyWorker1 = new worker.ThreadWorker('entry/ets/pages/ChildWorker');
43   const copyWorker2 = new worker.ThreadWorker('entry/ets/pages/ChildWorker');
44
45   workerPort.onmessage = (e : MessageEvents) => {
46     let array = e.data as collections.Array<CopyEntry>;
47     sum = array.length;
48     for (let i = 0; i < array.length; i++) {
49       let entry = array[i];
50       if (entry.type === 'copy1') {
51         count1++;
52         // 如果是copy1类型,则将数据传递给 copyWorker1
53         copyWorker1.postMessageWithSharedSendable(entry);
54       } else if (entry.type === 'copy2') {
55         count2++;
56         // 如果是copy2类型,则将数据传递给 copyWorker2
57         copyWorker2.postMessageWithSharedSendable(entry);
58       }
59     }
60   }
61
62   copyWorker1.onmessage = async (e : MessageEvents) => {
63     console.info('copyWorker1 onmessage:' + e.data);
64     await asyncLock.lockAsync(() => {
65       count1--;
66       if (count1 == 0) {
67         // 如果copyWorker1的任务全部完成,则关闭copyWorker1
68         console.info('copyWorker1 close');
69         copyWorker1.terminate();
70       }
71       sum--;
72       if (sum == 0) {
73         // 如果所有任务全部完成,则关闭父Worker
74         workerPort.close();
75       }
76     })
77   }
78
79   copyWorker2.onmessage = async (e : MessageEvents) => {
80     console.info('copyWorker2 onmessage:' + e.data);
81     await asyncLock.lockAsync(() => {
82       count2--;
83       sum--;
84       if (count2 == 0) {
85         // 如果copyWorker2的任务全部完成,则关闭copyWorker2
86         console.info('copyWorker2 close')
87         copyWorker2.terminate();
88       }
89       if (sum == 0) {
90         // 如果所有任务全部完成,则关闭父Worker
91         workerPort.close();
92       }
93     })
94   }
95
96   workerPort.onmessageerror = (e : MessageEvents) => {
97     console.info('onmessageerror:' + e.data);
98   }
99
100   workerPort.onerror = (e : ErrorEvent) => {
101     console.info('onerror:' + e.message);
102   }
103   ```
104   ```ts
105   // ChildWorker.ets
106   import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker} from '@kit.ArkTS'
107   import { CopyEntry } from './CopyEntry'
108
109   const workerPort: ThreadWorkerGlobalScope = worker.workerPort;
110
111   workerPort.onmessage = (e : MessageEvents) => {
112     let data = e.data as CopyEntry;
113     // 中间copy操作省略
114     console.info(data.filePath);
115     workerPort.postMessageWithSharedSendable("done");
116   }
117
118   workerPort.onmessageerror = (e : MessageEvents) => {
119     console.info('onmessageerror:' + e.data);
120   }
121
122   workerPort.onerror = (e : ErrorEvent) => {
123     console.info('onerror:' + e.message);
124   }
125   ```
126
1273. 在UI主进程页面,创建父Worker并准备克隆任务所需的数据,准备完成后将数据发送给父Worker。
128
129   ```ts
130   // Index.ets
131   import { worker, collections } from '@kit.ArkTS';
132   import { BusinessError } from '@kit.BasicServicesKit';
133   import { CopyEntry } from './CopyEntry'
134
135   function promiseCase() {
136     let p: Promise<void> = new Promise<void>((resolve: Function, reject: Function) => {
137       setTimeout(() => {
138         resolve(1);
139       }, 100)
140     }).then(undefined, (error: BusinessError) => {
141     })
142     return p;
143   }
144
145   async function postMessageTest() {
146     let ss = new worker.ThreadWorker("entry/ets/pages/ParentWorker");
147     let isTerminate = false;
148     ss.onexit = () => {
149       isTerminate = true;
150     }
151     let array = new collections.Array<CopyEntry>();
152     // 准备数据
153     for (let i = 0; i < 4; i++) {
154       if (i % 2 == 0) {
155         array.push(new CopyEntry("copy1", "file://copy1.txt"));
156       } else {
157         array.push(new CopyEntry("copy2", "file://copy2.txt"));
158       }
159     }
160     // 给Worker线程发送消息
161     ss.postMessageWithSharedSendable(array);
162     while (!isTerminate) {
163       await promiseCase();
164     }
165     console.info("Worker线程已退出");
166   }
167
168   @Entry
169   @Component
170   struct Index {
171     @State message: string = 'Hello World';
172     build() {
173       Row() {
174         Column() {
175           Text(this.message)
176             .fontSize(50)
177             .fontWeight(FontWeight.Bold)
178             .onClick(() => {
179               postMessageTest();
180             })
181         }
182         .width('100%')
183       }
184       .height('100%')
185     }
186   }
187   ```