• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# 多线程并发概述
2
3并发模型是用来实现不同应用场景中并发任务的编程模型,常见的并发模型分为基于内存共享的并发模型和基于消息通信的并发模型。
4
5Actor并发模型作为基于消息通信并发模型的典型代表,不需要开发者去面对锁带来的一系列复杂偶发的问题,同时并发度也相对较高,因此得到了广泛的支持和使用。
6
7当前ArkTS提供了TaskPool和Worker两种并发能力,TaskPool和Worker都基于Actor并发模型实现。
8
9Actor并发模型和内存共享并发模型的具体对比请见多线程并发模型
10
11## 多线程并发模型
12
13内存共享并发模型指多线程同时执行任务,这些线程依赖同一内存并且都有权限访问,线程访问内存前需要抢占并锁定内存的使用权,没有抢占到内存的线程需要等待其他线程释放使用权再执行。
14
15Actor并发模型每一个线程都是一个独立Actor,每个Actor有自己独立的内存,Actor之间通过消息传递机制触发对方Actor的行为,不同Actor之间不能直接访问对方的内存空间。
16
17Actor并发模型对比内存共享并发模型的优势在于不同线程间内存隔离,不会产生不同线程竞争同一内存资源的问题。开发者不需要考虑对内存上锁导致的一系列功能、性能问题,提升了开发效率。
18
19由于Actor并发模型线程之间不共享内存,需要通过线程间通信机制传输并发任务和任务结果。
20
21本文以经典的生产者消费者问题为例,对比呈现这两种模型在解决具体问题时的差异。
22
23### 内存共享模型
24
25以下示例伪代码展示了如何使用内存共享模型解决生产者消费者问题。
26
27为了避免不同生产者或消费者同时访问一块共享内存的容器时产生的脏读、脏写现象,同一时间只能有一个生产者或消费者访问该容器,也就是不同生产者和消费者争夺使用容器的锁。当一个角色获取锁之后其他角色需要等待该角色释放锁之后才能重新尝试获取锁以访问该容器。
28
29```ts
30// 此段示例为伪代码仅作为逻辑示意,便于开发者理解使用内存共享模型和Actor模型的区别
31class Queue {
32  // ...
33  push(value: number) {}
34
35  empty(): boolean {
36    // ...
37    return true
38  }
39
40  pop(value: number) :number {
41    // ...
42    return value;
43  }
44}
45
46class Mutex {
47  // ...
48  lock(): boolean {
49    // ...
50    return true;
51  }
52
53  unlock() {
54
55  }
56}
57class BufferQueue {
58  queue: Queue = new Queue()
59  mutex: Mutex = new Mutex()
60  add(value: number) {
61    // 尝试获取锁
62    if (this.mutex.lock()) {
63      this.queue.push(value)
64      this.mutex.unlock()
65    }
66  }
67
68  take(value: number): number {
69    let res: number = 0;
70    // 尝试获取锁
71    if (this.mutex.lock()) {
72      if (this.queue.empty()) {
73        res = 1;
74      }
75      let num: number = this.queue.pop(value)
76      this.mutex.unlock()
77      res = num;
78    }
79    return res;
80  }
81}
82
83// 构造一段全局共享的内存
84let g_bufferQueue = new BufferQueue()
85
86class Producer {
87  constructor() {
88  }
89  run() {
90    let value = Math.random()
91    // 跨线程访问bufferQueue对象
92    g_bufferQueue.add(value)
93  }
94}
95
96class ConsumerTest {
97  constructor() {
98  }
99  run() {
100    // 跨线程访问bufferQueue对象
101    let num = 123;
102    let res = g_bufferQueue.take(num)
103    if (res != null) {
104      // 添加消费逻辑
105    }
106  }
107}
108
109function Main(): void {
110  let consumer: ConsumerTest = new ConsumerTest()
111  let producer1: Producer = new Producer()
112  for (let i = 0;i < 0;i++) {
113    // 模拟启动多线程执行生产任务
114    // let thread = new Thread()
115    // thread.run(producer.run())
116    // consumer.run()
117  }
118}
119```
120
121
122### Actor模型
123
124Actor模型不同角色之间并不共享内存,生产者线程和UI线程都有自己的虚拟机实例,两个虚拟机实例之间拥有独占的内存,相互隔离。生产者生产出结果后通过序列化通信将结果发送给UI线程,UI线程消费结果后再发送新的生产任务给生产者线程。
125
126```ts
127import { taskpool } from '@kit.ArkTS';
128
129// 跨线程并发任务
130@Concurrent
131async function produce(): Promise<number> {
132  // 添加生产相关逻辑
133  console.info("producing...");
134  return Math.random();
135}
136
137class Consumer {
138  public consume(value: Object) {
139    // 添加消费相关逻辑
140    console.info("consuming value: " + value);
141  }
142}
143
144@Entry
145@Component
146struct Index {
147  @State message: string = 'Hello World'
148
149  build() {
150    Row() {
151      Column() {
152        Text(this.message)
153          .fontSize(50)
154          .fontWeight(FontWeight.Bold)
155        Button() {
156          Text("start")
157        }.onClick(() => {
158          let produceTask: taskpool.Task = new taskpool.Task(produce);
159          let consumer: Consumer = new Consumer();
160          for (let index: number = 0; index < 10; index++) {
161            // 执行生产异步并发任务,执行生产异步并发任务
162            taskpool.execute(produceTask).then((res: Object) => {
163              consumer.consume(res);
164            }).catch((e: Error) => {
165              console.error(e.message);
166            })
167          }
168        })
169        .width('20%')
170        .height('20%')
171      }
172      .width('100%')
173    }
174    .height('100%')
175  }
176}
177```
178<!--@[actor_model](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTs/ArkTsConcurrent/MultithreadedConcurrency/MultiThreadConcurrencyOverview/entry/src/main/ets/pages/Index.ets)-->
179
180同时,也可等待生产者的生产任务全部执行完成,通过序列化通信将结果发送给UI线程,UI线程接收完毕后再由消费者统一消费结果。
181
182```ts
183import { taskpool } from '@kit.ArkTS';
184
185// 跨线程并发任务
186@Concurrent
187async function produce(): Promise<number> {
188  // 添加生产相关逻辑
189  console.info("producing...");
190  return Math.random();
191}
192
193class Consumer {
194  public consume(value: Object) {
195    // 添加消费相关逻辑
196    console.info("consuming value: " + value);
197  }
198}
199
200@Entry
201@Component
202struct Index {
203  @State message: string = 'Hello World'
204
205  build() {
206    Row() {
207      Column() {
208        Text(this.message)
209          .fontSize(50)
210          .fontWeight(FontWeight.Bold)
211        Button() {
212          Text("start")
213        }.onClick(async () => {
214          let dataArray = new Array<number>();
215          let produceTask: taskpool.Task = new taskpool.Task(produce);
216          let consumer: Consumer = new Consumer();
217          for (let index: number = 0; index < 10; index++) {
218            // 执行生产异步并发任务
219            let result = await taskpool.execute(produceTask) as number;
220            dataArray.push(result);
221          }
222          for (let index: number = 0; index < dataArray.length; index++) {
223            consumer.consume(dataArray[index]);
224          }
225        })
226        .width('20%')
227        .height('20%')
228      }
229      .width('100%')
230    }
231    .height('100%')
232  }
233}
234```
235
236## TaskPool和Worker
237
238ArkTS提供了TaskPool和Worker两种并发能力供开发者选择,各自的运作机制和注意事项请见TaskPool简介和Worker简介,两者之间实现的特点和适用场景也存在差异,请见TaskPool和Worker的对比。