• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# High-Performance Communication Between Multi-Level Workers
2
3It is a common requirement for communication between multi-level [Workers](worker-introduction.md) (where child Workers are created by parent Workers, forming a hierarchy). Since you are responsible for managing the lifecycles of Worker threads, it is crucial to properly manage the lifecycles of multi-level Workers. You should ensure that all child Workers are destroyed before the parent Worker is destroyed.
4
5This topic describes how to implement high-performance communication between multi-level Workers. The key is using [Sendable objects](arkts-sendable.md) with the Worker's [postMessageWithSharedSendable](../reference/apis-arkts/js-apis-worker.md#postmessagewithsharedsendable12) interface for high-performance object passing between threads. For example, in a data cloning scenario with three Workers (one parent and two children), the parent Worker creates the child Workers, sends cloning tasks to them, and receives the results back.
6
71. Prepare a Sendable class **CopyEntry** to encapsulate the cloning task data.
8
9   ```ts
10   // CopyEntry.ets
11   @Sendable
12   export class CopyEntry {
13     // Clone type.
14     type: string;
15     // File path.
16     filePath: string;
17     constructor(type: string, filePath: string) {
18       this.type = type;
19       this.filePath = filePath;
20     }
21   }
22   ```
23
242. Prepare two Worker files: **ParentWorker.ets** for the parent Worker and **ChildWorker.ets** for the child Workers. The parent Worker distributes tasks and closes the child and parent Workers once all tasks are complete. The child Workers receive tasks, perform cloning, and notify the parent Worker when the tasks are complete.
25
26   ```ts
27   // ParentWorker.ets
28   import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker, collections, ArkTSUtils } from '@kit.ArkTS'
29   import { CopyEntry } from './CopyEntry'
30
31   const workerPort: ThreadWorkerGlobalScope = worker.workerPort;
32
33   // Calculate the number of tasks of worker1.
34   let count1 = 0;
35   // Calculate the number of tasks of worker2.
36   let count2 = 0;
37   // Calculate the total number of tasks.
38   let sum = 0;
39   // Asynchronous lock.
40   const asyncLock = new ArkTSUtils.locks.AsyncLock();
41   // Create a child Worker.
42   const copyWorker1 = new worker.ThreadWorker('entry/ets/pages/ChildWorker');
43   const copyWorker2 = new worker.ThreadWorker('entry/ets/pages/ChildWorker');
44
45   workerPort.onmessage = (e : MessageEvents) => {
46     let array = e.data as collections.Array<CopyEntry>;
47     sum = array.length;
48     for (let i = 0; i < array.length; i++) {
49       let entry = array[i];
50       if (entry.type === 'copy1') {
51         count1++;
52         // If the data type is copy1, transfer the data to copyWorker1.
53         copyWorker1.postMessageWithSharedSendable(entry);
54       } else if (entry.type === 'copy2') {
55         count2++;
56         // If the data type is copy2, transfer the data to copyWorker2.
57         copyWorker2.postMessageWithSharedSendable(entry);
58       }
59     }
60   }
61
62   copyWorker1.onmessage = async (e : MessageEvents) => {
63     console.info('copyWorker1 onmessage:' + e.data);
64     await asyncLock.lockAsync(() => {
65       count1--;
66       if (count1 == 0) {
67         // If all tasks of copyWorker1 are complete, close copyWorker1.
68         console.info('copyWorker1 close');
69         copyWorker1.terminate();
70       }
71       sum--;
72       if (sum == 0) {
73         // If all tasks are complete, close the parent Worker.
74         workerPort.close();
75       }
76     })
77   }
78
79   copyWorker2.onmessage = async (e : MessageEvents) => {
80     console.info('copyWorker2 onmessage:' + e.data);
81     await asyncLock.lockAsync(() => {
82       count2--;
83       sum--;
84       if (count2 == 0) {
85         // If all tasks of copyWorker2 are complete, close copyWorker2.
86         console.info('copyWorker2 close')
87         copyWorker2.terminate();
88       }
89       if (sum == 0) {
90         // If all tasks are complete, close the parent Worker.
91         workerPort.close();
92       }
93     })
94   }
95
96   workerPort.onmessageerror = (e : MessageEvents) => {
97     console.info('onmessageerror:' + e.data);
98   }
99
100   workerPort.onerror = (e : ErrorEvent) => {
101     console.info('onerror:' + e.message);
102   }
103   ```
104   ```ts
105   // ChildWorker.ets
106   import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker} from '@kit.ArkTS'
107   import { CopyEntry } from './CopyEntry'
108
109   const workerPort: ThreadWorkerGlobalScope = worker.workerPort;
110
111   workerPort.onmessage = (e : MessageEvents) => {
112     let data = e.data as CopyEntry;
113     // The copy operation is omitted.
114     console.info(data.filePath);
115     workerPort.postMessageWithSharedSendable("done");
116   }
117
118   workerPort.onmessageerror = (e : MessageEvents) => {
119     console.info('onmessageerror:' + e.data);
120   }
121
122   workerPort.onerror = (e : ErrorEvent) => {
123     console.info('onerror:' + e.message);
124   }
125   ```
126
1273. On the main process page of the UI, create a parent Worker, prepare the data required for the cloning task, and send the data to the parent Worker.
128
129   ```ts
130   // Index.ets
131   import { worker, collections } from '@kit.ArkTS';
132   import { BusinessError } from '@kit.BasicServicesKit';
133   import { CopyEntry } from './CopyEntry'
134
135   function promiseCase() {
136     let p: Promise<void> = new Promise<void>((resolve: Function, reject: Function) => {
137       setTimeout(() => {
138         resolve(1);
139       }, 100)
140     }).then(undefined, (error: BusinessError) => {
141     })
142     return p;
143   }
144
145   async function postMessageTest() {
146     let ss = new worker.ThreadWorker("entry/ets/pages/ParentWorker");
147     let isTerminate = false;
148     ss.onexit = () => {
149       isTerminate = true;
150     }
151     let array = new collections.Array<CopyEntry>();
152     // Prepare data.
153     for (let i = 0; i < 4; i++) {
154       if (i % 2 == 0) {
155         array.push(new CopyEntry("copy1", "file://copy1.txt"));
156       } else {
157         array.push(new CopyEntry("copy2", "file://copy2.txt"));
158       }
159     }
160     // Send a message to the Worker thread.
161     ss.postMessageWithSharedSendable(array);
162     while (!isTerminate) {
163       await promiseCase();
164     }
165     console.info("Worker thread has exited");
166   }
167
168   @Entry
169   @Component
170   struct Index {
171     @State message: string = 'Hello World';
172     build() {
173       Row() {
174         Column() {
175           Text(this.message)
176             .fontSize(50)
177             .fontWeight(FontWeight.Bold)
178             .onClick(() => {
179               postMessageTest();
180             })
181         }
182         .width('100%')
183       }
184       .height('100%')
185     }
186   }
187   ```
188