• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# 多级Worker间高性能消息通信
2<!--Kit: ArkTS-->
3<!--Subsystem: CommonLibrary-->
4<!--Owner: @lijiamin2025-->
5<!--Designer: @weng-changcheng-->
6<!--Tester: @kirl75; @zsw_zhushiwei-->
7<!--Adviser: @ge-yafang-->
8
9多级[Worker](worker-introduction.md)(即通过父Worker创建子Worker的机制形成层级线程关系)间通信是一种常见的需求,由于Worker线程生命周期由用户自行管理,因此需要注意多级Worker生命周期的正确管理,建议开发者确保销毁父Worker前先销毁所有子Worker。
10
11本文介绍如何在多级Worker间实现高性能消息通信。高性能消息通信的关键在于[Sendable对象](arkts-sendable.md),结合[postMessageWithSharedSendable接口](../reference/apis-arkts/js-apis-worker.md#postmessagewithsharedsendable12),可以实现线程间高性能的对象传递。例如,在数据克隆场景中,假设有一个父Worker和两个子Worker。父Worker负责创建子Worker,并向子Worker发送数据克隆任务。子Worker接收任务并执行数据克隆操作,完成后将克隆结果返回给父Worker。
12
131. 准备一个Sendable类CopyEntry,封装克隆任务数据。
14
15   ```ts
16   // CopyEntry.ets
17   @Sendable
18   export class CopyEntry {
19     // 克隆类型
20     type: string;
21     // 文件路径
22     filePath: string;
23     constructor(type: string, filePath: string) {
24       this.type = type;
25       this.filePath = filePath;
26     }
27   }
28   ```
29
302. 创建两个Worker文件,DevEco Studio支持一键生成Worker,在对应的{moduleName}目录下任意位置,单击鼠标右键 &gt; New &gt; Worker,即可自动生成Worker的模板文件及配置信息。本文以创建“ParentWorker”(父Worker)和“ChildWorker”(子Worker)为例。父Worker负责分发克隆任务并判断任务全部完成后关闭子Worker与父Worker;子Worker负责接收任务并执行数据克隆操作,并在任务完成后通知父Worker。
31
32   ```ts
33   // ParentWorker.ets
34   import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker, collections, ArkTSUtils } from '@kit.ArkTS'
35   import { CopyEntry } from './CopyEntry'
36
37   const workerPort: ThreadWorkerGlobalScope = worker.workerPort;
38
39   // 计算worker1的任务数量
40   let count1 = 0;
41   // 计算worker2的任务数量
42   let count2 = 0;
43   // 计算总任务数量
44   let sum = 0;
45   // 异步锁
46   const asyncLock = new ArkTSUtils.locks.AsyncLock();
47   // 创建子Worker
48   const copyWorker1 = new worker.ThreadWorker('entry/ets/pages/ChildWorker.ets');
49   const copyWorker2 = new worker.ThreadWorker('entry/ets/pages/ChildWorker.ets');
50
51   workerPort.onmessage = (e : MessageEvents) => {
52     let array = e.data as collections.Array<CopyEntry>;
53     sum = array.length;
54     for (let i = 0; i < array.length; i++) {
55       let entry = array[i];
56       if (entry.type === 'copy1') {
57         count1++;
58         // 如果是copy1类型,则将数据传递给 copyWorker1
59         copyWorker1.postMessageWithSharedSendable(entry);
60       } else if (entry.type === 'copy2') {
61         count2++;
62         // 如果是copy2类型,则将数据传递给 copyWorker2
63         copyWorker2.postMessageWithSharedSendable(entry);
64       }
65     }
66   }
67
68   copyWorker1.onmessage = async (e : MessageEvents) => {
69     console.info('copyWorker1 onmessage:' + e.data);
70     await asyncLock.lockAsync(() => {
71       count1--;
72       if (count1 == 0) {
73         // 如果copyWorker1的任务全部完成,则关闭copyWorker1
74         console.info('copyWorker1 close');
75         copyWorker1.terminate();
76       }
77       sum--;
78       if (sum == 0) {
79         // 如果所有任务全部完成,则关闭父Worker
80         workerPort.close();
81       }
82     })
83   }
84
85   copyWorker2.onmessage = async (e : MessageEvents) => {
86     console.info('copyWorker2 onmessage:' + e.data);
87     await asyncLock.lockAsync(() => {
88       count2--;
89       sum--;
90       if (count2 == 0) {
91         // 如果copyWorker2的任务全部完成,则关闭copyWorker2
92         console.info('copyWorker2 close')
93         copyWorker2.terminate();
94       }
95       if (sum == 0) {
96         // 如果所有任务全部完成,则关闭父Worker
97         workerPort.close();
98       }
99     })
100   }
101
102   workerPort.onmessageerror = (e : MessageEvents) => {
103     console.error('onmessageerror:' + e.data);
104   }
105
106   workerPort.onerror = (e : ErrorEvent) => {
107     console.error('onerror:' + e.message);
108   }
109   ```
110   ```ts
111   // ChildWorker.ets
112   import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker} from '@kit.ArkTS'
113   import { CopyEntry } from './CopyEntry'
114
115   const workerPort: ThreadWorkerGlobalScope = worker.workerPort;
116
117   workerPort.onmessage = (e : MessageEvents) => {
118     let data = e.data as CopyEntry;
119     // 中间copy操作省略
120     console.info(data.filePath);
121     workerPort.postMessageWithSharedSendable("done");
122   }
123
124   workerPort.onmessageerror = (e : MessageEvents) => {
125     console.error('onmessageerror:' + e.data);
126   }
127
128   workerPort.onerror = (e : ErrorEvent) => {
129     console.error('onerror:' + e.message);
130   }
131   ```
132
1333. 在UI主线程页面,创建父Worker并准备克隆任务所需的数据,准备完成后将数据发送给父Worker。
134
135   ```ts
136   // Index.ets
137   import { worker, collections } from '@kit.ArkTS';
138   import { CopyEntry } from './CopyEntry'
139
140   function promiseCase() {
141     let p: Promise<void> = new Promise<void>((resolve: Function, reject: Function) => {
142       setTimeout(() => {
143         resolve();
144       }, 100);
145     });
146     return p;
147   }
148
149   async function postMessageTest() {
150     let ss = new worker.ThreadWorker("entry/ets/pages/ParentWorker.ets");
151     let isTerminate = false;
152     ss.onexit = () => {
153       isTerminate = true;
154     }
155     let array = new collections.Array<CopyEntry>();
156     // 准备数据
157     for (let i = 0; i < 4; i++) {
158       if (i % 2 == 0) {
159         array.push(new CopyEntry("copy1", "file://copy1.txt"));
160       } else {
161         array.push(new CopyEntry("copy2", "file://copy2.txt"));
162       }
163     }
164     // 给Worker线程发送消息
165     ss.postMessageWithSharedSendable(array);
166     while (!isTerminate) {
167       await promiseCase();
168     }
169     console.info("Worker线程已退出");
170   }
171
172   @Entry
173   @Component
174   struct Index {
175     @State message: string = 'Hello World';
176     build() {
177       Row() {
178         Column() {
179           Text(this.message)
180             .fontSize(50)
181             .fontWeight(FontWeight.Bold)
182             .onClick(() => {
183               postMessageTest();
184             })
185         }
186         .width('100%')
187       }
188       .height('100%')
189     }
190   }
191   ```