• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# 多线程并发概述
2<!--Kit: ArkTS-->
3<!--Subsystem: CommonLibrary-->
4<!--Owner: @wang_zhaoyong-->
5<!--Designer: @weng-changcheng-->
6<!--Tester: @kirl75; @zsw_zhushiwei-->
7<!--Adviser: @ge-yafang-->
8
9多线程并发是指在单个程序中同时运行多个线程,通过并行或交替执行任务来提升性能和资源利用率的编程模型。在ArkTS应用开发中,多线程并发适用于多种业务场景,常见的业务场景主要分为以下三类,更详细的使用请参考[多线程开发实践案例](batch-database-operations-guide.md)。
10
11- 业务逻辑包含大量计算或频繁的I/O读写等需要长时间执行的任务,例如图片和视频的编解码、文件的压缩与解压缩、数据库操作等场景。
12
13- 业务逻辑包括监听和定期采集数据等需要长时间保持运行的任务,例如定期采集传感器数据的场景。
14
15- 业务逻辑跟随主线程的生命周期,或与主线程绑定的任务,例如在游戏中的业务场景。
16
17
18并发模型用于实现不同应用场景中的并发任务。常见的并发模型有基于内存共享的模型和基于消息通信的模型。
19
20Actor并发模型是基于消息通信的典型并发模型。开发者无需处理锁带来的复杂问题,且具备高并发度,因此应用广泛。
21
22当前ArkTS提供了TaskPool和Worker两种并发能力,两者均基于Actor并发模型实现。
23
24Actor并发模型和内存共享并发模型的具体对比请见[多线程并发模型](#多线程并发模型)。
25
26## 多线程并发模型
27
28内存共享并发模型指多线程同时执行任务,这些线程依赖同一内存资源并且都有权限访问,线程访问内存前需要抢占并锁定内存的使用权,没有抢占到内存的线程需要等待其他线程释放使用权再执行。
29
30Actor并发模型每一个线程都是一个独立Actor,每个Actor有自己独立的内存,Actor之间通过消息传递机制触发对方Actor的行为,不同Actor之间不能直接访问对方的内存空间。
31
32Actor并发模型与内存共享并发模型相比,不同线程间的内存是隔离的,因此不会发生线程竞争同一内存资源的情况。无需处理内存上锁问题,从而提高开发效率。
33
34Actor并发模型中,不同Actor之间不共享内存,需通过消息传递机制传递任务和结果。
35
36本文以经典的生产者消费者问题为例,分析这两种模型在解决问题时的差异。
37
38### 内存共享模型
39
40以下示例伪代码和示意图展示了如何使用内存共享模型解决生产者消费者问题。
41
42![zh-cn_image_0000002001497485](figures/zh-cn_image_0000002001497485.png)
43
44为了避免不同生产者或消费者同时访问同一块共享内存容器时产生脏读、脏写现象,同一时间只能有一个生产者或消费者访问该容器。即不同生产者和消费者需争夺使用容器的锁。当一个角色获取锁后,其他角色需等待该角色释放锁,才能重新尝试获取锁以访问该容器。
45
46```ts
47// 此段示例为伪代码仅作为逻辑示意,便于开发者理解使用内存共享模型和Actor模型的区别
48class Queue {
49  // ...
50  push(value: number) {
51    // ...
52  }
53
54  empty(): boolean {
55    // ...
56    return true;
57  }
58
59  pop(value: number): number {
60    // ...
61    return value;
62  }
63  // ...
64}
65
66class Mutex {
67  // ...
68  lock(): boolean {
69    // ...
70    return true;
71  }
72
73  unlock() {
74    // ...
75  }
76  // ...
77}
78
79class BufferQueue {
80  queue: Queue = new Queue();
81  mutex: Mutex = new Mutex();
82
83  add(value: number) {
84    // 尝试获取锁
85    if (this.mutex.lock()) {
86      this.queue.push(value);
87      this.mutex.unlock();
88    }
89  }
90
91  take(value: number): number {
92    let res: number = 0;
93    // 尝试获取锁
94    if (this.mutex.lock()) {
95      if (this.queue.empty()) {
96        res = 1;
97        return res;
98      }
99      let num: number = this.queue.pop(value);
100      this.mutex.unlock();
101      res = num;
102    }
103    return res;
104  }
105}
106
107// 构造一段全局共享的内存
108let g_bufferQueue = new BufferQueue();
109
110class Producer {
111  constructor() {
112  }
113
114  run() {
115    let value = Math.random();
116    // 跨线程访问bufferQueue对象
117    g_bufferQueue.add(value);
118  }
119}
120
121class ConsumerTest {
122  constructor() {
123  }
124
125  run() {
126    // 跨线程访问bufferQueue对象
127    let num = 123;
128    let res = g_bufferQueue.take(num);
129    if (res != null) {
130      // 添加消费逻辑
131    }
132  }
133}
134
135function Main(): void {
136  let consumer: ConsumerTest = new ConsumerTest();
137  let producer: Producer = new Producer();
138  let threadNum: number = 10;
139  for (let i = 0; i < threadNum; i++) {
140    // 如下伪代码模拟启动多线程执行生产任务
141    // let thread = new Thread();
142    // thread.run(producer.run());
143    // consumer.run();
144  }
145}
146```
147
148
149### Actor模型
150
151以下示例简单展示了如何使用基于Actor模型的TaskPool并发能力来解决生产者消费者问题。
152
153![zh-cn_image_0000001964697544](figures/zh-cn_image_0000001964697544.png)
154
155Actor模型中,不同角色之间并不共享内存,生产者线程和UI线程都有自己的虚拟机实例,两个虚拟机实例之间拥有独占的内存,相互隔离。生产者生产出结果后,通过序列化通信将结果发送给UI线程。UI线程消费结果后,再发送新的生产任务给生产者线程。
156
157```ts
158import { taskpool } from '@kit.ArkTS';
159
160// 跨线程并发任务
161@Concurrent
162async function produce(): Promise<number> {
163  // 添加生产相关逻辑
164  console.info('producing...');
165  return Math.random();
166}
167
168class Consumer {
169  public consume(value: Object) {
170    // 添加消费相关逻辑
171    console.info('consuming value: ' + value);
172  }
173}
174
175@Entry
176@Component
177struct Index {
178  @State message: string = 'Hello World';
179
180  build() {
181    Row() {
182      Column() {
183        Text(this.message)
184          .fontSize(50)
185          .fontWeight(FontWeight.Bold)
186        Button() {
187          Text('start')
188        }.onClick(() => {
189          let produceTask: taskpool.Task = new taskpool.Task(produce);
190          let consumer: Consumer = new Consumer();
191          for (let index: number = 0; index < 10; index++) {
192            // 执行生产异步并发任务
193            taskpool.execute(produceTask).then((res: Object) => {
194              consumer.consume(res);
195            }).catch((e: Error) => {
196              console.error(e.message);
197            })
198          }
199        })
200        .width('20%')
201        .height('20%')
202      }
203      .width('100%')
204    }
205    .height('100%')
206  }
207}
208```
209<!-- @[actor_model](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/MultithreadedConcurrency/MultiThreadConcurrencyOverview/entry/src/main/ets/pages/Index.ets) -->
210
211也可以等待生产者完成所有任务,通过序列化通信将结果发送给UI线程。UI线程接收后,由消费者统一消费结果。
212
213```ts
214import { taskpool } from '@kit.ArkTS';
215
216// 跨线程并发任务
217@Concurrent
218async function produce(): Promise<number> {
219  // 添加生产相关逻辑
220  console.info('producing...');
221  return Math.random();
222}
223
224class Consumer {
225  public consume(value: number) {
226    // 添加消费相关逻辑
227    console.info('consuming value: ' + value);
228  }
229}
230
231@Entry
232@Component
233struct Index {
234  @State message: string = 'Hello World'
235
236  build() {
237    Row() {
238      Column() {
239        Text(this.message)
240          .fontSize(50)
241          .fontWeight(FontWeight.Bold)
242        Button() {
243          Text('start')
244        }.onClick(async () => {
245          let dataArray = new Array<number>();
246          let produceTask: taskpool.Task = new taskpool.Task(produce);
247          let consumer: Consumer = new Consumer();
248          for (let index: number = 0; index < 10; index++) {
249            // 执行生产异步并发任务
250            let result = await taskpool.execute(produceTask) as number;
251            dataArray.push(result);
252          }
253          for (let index: number = 0; index < dataArray.length; index++) {
254            consumer.consume(dataArray[index]);
255          }
256        })
257        .width('20%')
258        .height('20%')
259      }
260      .width('100%')
261    }
262    .height('100%')
263  }
264}
265```
266
267## TaskPool和Worker
268
269ArkTS提供了TaskPool和Worker两种并发能力供开发者选择,各自的运作机制和注意事项请见[TaskPool简介](taskpool-introduction.md)和[Worker简介](worker-introduction.md),两者之间实现的特点和适用场景也存在差异,请见[TaskPool和Worker的对比](taskpool-vs-worker.md)。
270
271## 并发注意事项
272
273- 避免在并发线程中操作UI
274
275  UI操作必须在主线程中执行。并发线程中操作UI可能导致界面异常或崩溃。
276
277- 数据传递需支持序列化/反序列化
278
279  并发任务间传递数据时,对象必须是可序列化的(如基本类型、普通对象等),不可传递函数、循环引用、特殊对象(如Promise、Error)等。已完成(fulfilled或rejected)状态的 Promise可以被传递,因为其结果是可序列化的。
280
281- 合理控制并发粒度
282
283  频繁创建和销毁并发任务(如Worker、Task)会带来额外性能开销,建议复用或使用任务池机制。
284
285- 注意内存泄漏风险
286
287  避免在并发任务中持有外部对象的强引用,防止内存泄漏。
288
289- 并发任务应具备独立性
290
291  并发任务应尽量不依赖外部状态,减少竞态条件(Race Condition)和同步开销。竞态条件是指多个线程或任务同时访问并修改共享数据,执行结果依赖于任务调度的顺序,可能导致数据不一致或不可预期的行为。