1# 多线程并发概述 2 3并发模型是用来实现不同应用场景中并发任务的编程模型,常见的并发模型分为基于内存共享的并发模型和基于消息通信的并发模型。 4 5Actor并发模型作为基于消息通信并发模型的典型代表,不需要开发者去面对锁带来的一系列复杂偶发的问题,同时并发度也相对较高,因此得到了广泛的支持和使用。 6 7当前ArkTS提供了TaskPool和Worker两种并发能力,TaskPool和Worker都基于Actor并发模型实现。 8 9Actor并发模型和内存共享并发模型的具体对比请见多线程并发模型 10 11## 多线程并发模型 12 13内存共享并发模型指多线程同时执行任务,这些线程依赖同一内存并且都有权限访问,线程访问内存前需要抢占并锁定内存的使用权,没有抢占到内存的线程需要等待其他线程释放使用权再执行。 14 15Actor并发模型每一个线程都是一个独立Actor,每个Actor有自己独立的内存,Actor之间通过消息传递机制触发对方Actor的行为,不同Actor之间不能直接访问对方的内存空间。 16 17Actor并发模型对比内存共享并发模型的优势在于不同线程间内存隔离,不会产生不同线程竞争同一内存资源的问题。开发者不需要考虑对内存上锁导致的一系列功能、性能问题,提升了开发效率。 18 19由于Actor并发模型线程之间不共享内存,需要通过线程间通信机制传输并发任务和任务结果。 20 21本文以经典的生产者消费者问题为例,对比呈现这两种模型在解决具体问题时的差异。 22 23### 内存共享模型 24 25以下示例伪代码展示了如何使用内存共享模型解决生产者消费者问题。 26 27为了避免不同生产者或消费者同时访问一块共享内存的容器时产生的脏读、脏写现象,同一时间只能有一个生产者或消费者访问该容器,也就是不同生产者和消费者争夺使用容器的锁。当一个角色获取锁之后其他角色需要等待该角色释放锁之后才能重新尝试获取锁以访问该容器。 28 29```ts 30// 此段示例为伪代码仅作为逻辑示意,便于开发者理解使用内存共享模型和Actor模型的区别 31class Queue { 32 // ... 33 push(value: number) {} 34 35 empty(): boolean { 36 // ... 37 return true 38 } 39 40 pop(value: number) :number { 41 // ... 42 return value; 43 } 44} 45 46class Mutex { 47 // ... 48 lock(): boolean { 49 // ... 50 return true; 51 } 52 53 unlock() { 54 55 } 56} 57class BufferQueue { 58 queue: Queue = new Queue() 59 mutex: Mutex = new Mutex() 60 add(value: number) { 61 // 尝试获取锁 62 if (this.mutex.lock()) { 63 this.queue.push(value) 64 this.mutex.unlock() 65 } 66 } 67 68 take(value: number): number { 69 let res: number = 0; 70 // 尝试获取锁 71 if (this.mutex.lock()) { 72 if (this.queue.empty()) { 73 res = 1; 74 } 75 let num: number = this.queue.pop(value) 76 this.mutex.unlock() 77 res = num; 78 } 79 return res; 80 } 81} 82 83// 构造一段全局共享的内存 84let g_bufferQueue = new BufferQueue() 85 86class Producer { 87 constructor() { 88 } 89 run() { 90 let value = Math.random() 91 // 跨线程访问bufferQueue对象 92 g_bufferQueue.add(value) 93 } 94} 95 96class ConsumerTest { 97 constructor() { 98 } 99 run() { 100 // 跨线程访问bufferQueue对象 101 let num = 123; 102 let res = g_bufferQueue.take(num) 103 if (res != null) { 104 // 添加消费逻辑 105 } 106 } 107} 108 109function Main(): void { 110 let consumer: ConsumerTest = new ConsumerTest() 111 let producer1: Producer = new Producer() 112 for (let i = 0;i < 0;i++) { 113 // 模拟启动多线程执行生产任务 114 // let thread = new Thread() 115 // thread.run(producer.run()) 116 // consumer.run() 117 } 118} 119``` 120 121 122### Actor模型 123 124Actor模型不同角色之间并不共享内存,生产者线程和UI线程都有自己的虚拟机实例,两个虚拟机实例之间拥有独占的内存,相互隔离。生产者生产出结果后通过序列化通信将结果发送给UI线程,UI线程消费结果后再发送新的生产任务给生产者线程。 125 126```ts 127import { taskpool } from '@kit.ArkTS'; 128 129// 跨线程并发任务 130@Concurrent 131async function produce(): Promise<number> { 132 // 添加生产相关逻辑 133 console.info("producing..."); 134 return Math.random(); 135} 136 137class Consumer { 138 public consume(value: Object) { 139 // 添加消费相关逻辑 140 console.info("consuming value: " + value); 141 } 142} 143 144@Entry 145@Component 146struct Index { 147 @State message: string = 'Hello World' 148 149 build() { 150 Row() { 151 Column() { 152 Text(this.message) 153 .fontSize(50) 154 .fontWeight(FontWeight.Bold) 155 Button() { 156 Text("start") 157 }.onClick(() => { 158 let produceTask: taskpool.Task = new taskpool.Task(produce); 159 let consumer: Consumer = new Consumer(); 160 for (let index: number = 0; index < 10; index++) { 161 // 执行生产异步并发任务,执行生产异步并发任务 162 taskpool.execute(produceTask).then((res: Object) => { 163 consumer.consume(res); 164 }).catch((e: Error) => { 165 console.error(e.message); 166 }) 167 } 168 }) 169 .width('20%') 170 .height('20%') 171 } 172 .width('100%') 173 } 174 .height('100%') 175 } 176} 177``` 178<!--@[actor_model](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTs/ArkTsConcurrent/MultithreadedConcurrency/MultiThreadConcurrencyOverview/entry/src/main/ets/pages/Index.ets)--> 179 180同时,也可等待生产者的生产任务全部执行完成,通过序列化通信将结果发送给UI线程,UI线程接收完毕后再由消费者统一消费结果。 181 182```ts 183import { taskpool } from '@kit.ArkTS'; 184 185// 跨线程并发任务 186@Concurrent 187async function produce(): Promise<number> { 188 // 添加生产相关逻辑 189 console.info("producing..."); 190 return Math.random(); 191} 192 193class Consumer { 194 public consume(value: Object) { 195 // 添加消费相关逻辑 196 console.info("consuming value: " + value); 197 } 198} 199 200@Entry 201@Component 202struct Index { 203 @State message: string = 'Hello World' 204 205 build() { 206 Row() { 207 Column() { 208 Text(this.message) 209 .fontSize(50) 210 .fontWeight(FontWeight.Bold) 211 Button() { 212 Text("start") 213 }.onClick(async () => { 214 let dataArray = new Array<number>(); 215 let produceTask: taskpool.Task = new taskpool.Task(produce); 216 let consumer: Consumer = new Consumer(); 217 for (let index: number = 0; index < 10; index++) { 218 // 执行生产异步并发任务 219 let result = await taskpool.execute(produceTask) as number; 220 dataArray.push(result); 221 } 222 for (let index: number = 0; index < dataArray.length; index++) { 223 consumer.consume(dataArray[index]); 224 } 225 }) 226 .width('20%') 227 .height('20%') 228 } 229 .width('100%') 230 } 231 .height('100%') 232 } 233} 234``` 235 236## TaskPool和Worker 237 238ArkTS提供了TaskPool和Worker两种并发能力供开发者选择,各自的运作机制和注意事项请见TaskPool简介和Worker简介,两者之间实现的特点和适用场景也存在差异,请见TaskPool和Worker的对比。