1# 多线程并发概述 2 3多线程并发是指在单个程序中同时运行多个线程,通过并行或交替执行任务来提升性能和资源利用率的编程模型。在ArkTS应用开发过程中,需要用到多线程并发的业务场景有很多,针对常见的业务场景,主要可以分为以下三类,更详细的使用请参考[多线程开发实践案例](batch-database-operations-guide.md)。 4 5- 业务逻辑包含较大计算量或多次I/O读写等需要长时间执行的任务,例如图片/视频编解码,压缩/解压缩,数据库操作等场景。 6 7- 业务逻辑包含监听或定期采集数据等需要长时间保持运行的任务,例如定期采集传感器数据场景。 8 9- 业务逻辑跟随主线程生命周期或与主线程绑定的任务,例如游戏中台场景。 10 11 12并发模型用于实现不同应用场景中的并发任务。常见的并发模型有基于内存共享的模型和基于消息通信的模型。 13 14Actor并发模型是基于消息通信的并发模型的典型代表。它使开发者无需处理锁带来的复杂问题,并且具有较高的并发度,因此得到了广泛的应用。 15 16当前ArkTS提供了TaskPool和Worker两种并发能力,两者均基于Actor并发模型实现。 17 18Actor并发模型和内存共享并发模型的具体对比请见[多线程并发模型](#多线程并发模型)。 19 20## 多线程并发模型 21 22内存共享并发模型指多线程同时执行任务,这些线程依赖同一内存并且都有权限访问,线程访问内存前需要抢占并锁定内存的使用权,没有抢占到内存的线程需要等待其他线程释放使用权再执行。 23 24Actor并发模型每一个线程都是一个独立Actor,每个Actor有自己独立的内存,Actor之间通过消息传递机制触发对方Actor的行为,不同Actor之间不能直接访问对方的内存空间。 25 26Actor并发模型相较于内存共享并发模型,不同线程间的内存是隔离的,因此不会出现线程竞争同一内存资源的情况。开发者无需处理内存上锁相关的问题,从而提高开发效率。 27 28Actor并发模型中,线程不共享内存,需通过线程间通信机制传递任务和结果。 29 30本文以经典的生产者消费者问题为例,对比呈现这两种模型在解决具体问题时的差异。 31 32### 内存共享模型 33 34以下示例伪代码和示意图展示了如何使用内存共享模型解决生产者消费者问题。 35 36 37 38为了避免不同生产者或消费者同时访问一块共享内存的容器时产生的脏读、脏写现象,同一时间只能有一个生产者或消费者访问该容器,也就是不同生产者和消费者争夺使用容器的锁。当一个角色获取锁之后其他角色需要等待该角色释放锁之后才能重新尝试获取锁以访问该容器。 39 40```ts 41// 此段示例为伪代码仅作为逻辑示意,便于开发者理解使用内存共享模型和Actor模型的区别 42class Queue { 43 // ... 44 push(value: number) {} 45 46 empty(): boolean { 47 // ... 48 return true 49 } 50 51 pop(value: number) :number { 52 // ... 53 return value; 54 } 55} 56 57class Mutex { 58 // ... 59 lock(): boolean { 60 // ... 61 return true; 62 } 63 64 unlock() { 65 66 } 67} 68class BufferQueue { 69 queue: Queue = new Queue() 70 mutex: Mutex = new Mutex() 71 add(value: number) { 72 // 尝试获取锁 73 if (this.mutex.lock()) { 74 this.queue.push(value) 75 this.mutex.unlock() 76 } 77 } 78 79 take(value: number): number { 80 let res: number = 0; 81 // 尝试获取锁 82 if (this.mutex.lock()) { 83 if (this.queue.empty()) { 84 res = 1; 85 } 86 let num: number = this.queue.pop(value) 87 this.mutex.unlock() 88 res = num; 89 } 90 return res; 91 } 92} 93 94// 构造一段全局共享的内存 95let g_bufferQueue = new BufferQueue() 96 97class Producer { 98 constructor() { 99 } 100 run() { 101 let value = Math.random() 102 // 跨线程访问bufferQueue对象 103 g_bufferQueue.add(value) 104 } 105} 106 107class ConsumerTest { 108 constructor() { 109 } 110 run() { 111 // 跨线程访问bufferQueue对象 112 let num = 123; 113 let res = g_bufferQueue.take(num) 114 if (res != null) { 115 // 添加消费逻辑 116 } 117 } 118} 119 120function Main(): void { 121 let consumer: ConsumerTest = new ConsumerTest() 122 let producer1: Producer = new Producer() 123 for (let i = 0;i < 0;i++) { 124 // 模拟启动多线程执行生产任务 125 // let thread = new Thread() 126 // thread.run(producer.run()) 127 // consumer.run() 128 } 129} 130``` 131 132 133### Actor模型 134 135以下示例简单展示了如何使用基于Actor模型的TaskPool并发能力来解决生产者消费者问题。 136 137 138 139Actor模型中,不同角色之间并不共享内存,生产者线程和UI线程都有自己的虚拟机实例,两个虚拟机实例之间拥有独占的内存,相互隔离。生产者生产出结果后,通过序列化通信将结果发送给UI线程,UI线程消费结果后再发送新的生产任务给生产者线程。 140 141```ts 142import { taskpool } from '@kit.ArkTS'; 143 144// 跨线程并发任务 145@Concurrent 146async function produce(): Promise<number> { 147 // 添加生产相关逻辑 148 console.info("producing..."); 149 return Math.random(); 150} 151 152class Consumer { 153 public consume(value: Object) { 154 // 添加消费相关逻辑 155 console.info("consuming value: " + value); 156 } 157} 158 159@Entry 160@Component 161struct Index { 162 @State message: string = 'Hello World' 163 164 build() { 165 Row() { 166 Column() { 167 Text(this.message) 168 .fontSize(50) 169 .fontWeight(FontWeight.Bold) 170 Button() { 171 Text("start") 172 }.onClick(() => { 173 let produceTask: taskpool.Task = new taskpool.Task(produce); 174 let consumer: Consumer = new Consumer(); 175 for (let index: number = 0; index < 10; index++) { 176 // 执行生产异步并发任务 177 taskpool.execute(produceTask).then((res: Object) => { 178 consumer.consume(res); 179 }).catch((e: Error) => { 180 console.error(e.message); 181 }) 182 } 183 }) 184 .width('20%') 185 .height('20%') 186 } 187 .width('100%') 188 } 189 .height('100%') 190 } 191} 192``` 193 194也可以等待生产者完成所有生产任务,通过序列化通信将结果发送给UI线程。UI线程接收完毕后,由消费者统一消费结果。 195 196```ts 197import { taskpool } from '@kit.ArkTS'; 198 199// 跨线程并发任务 200@Concurrent 201async function produce(): Promise<number> { 202 // 添加生产相关逻辑 203 console.info("producing..."); 204 return Math.random(); 205} 206 207class Consumer { 208 public consume(value: Object) { 209 // 添加消费相关逻辑 210 console.info("consuming value: " + value); 211 } 212} 213 214@Entry 215@Component 216struct Index { 217 @State message: string = 'Hello World' 218 219 build() { 220 Row() { 221 Column() { 222 Text(this.message) 223 .fontSize(50) 224 .fontWeight(FontWeight.Bold) 225 Button() { 226 Text("start") 227 }.onClick(async () => { 228 let dataArray = new Array<number>(); 229 let produceTask: taskpool.Task = new taskpool.Task(produce); 230 let consumer: Consumer = new Consumer(); 231 for (let index: number = 0; index < 10; index++) { 232 // 执行生产异步并发任务 233 let result = await taskpool.execute(produceTask) as number; 234 dataArray.push(result); 235 } 236 for (let index: number = 0; index < dataArray.length; index++) { 237 consumer.consume(dataArray[index]); 238 } 239 }) 240 .width('20%') 241 .height('20%') 242 } 243 .width('100%') 244 } 245 .height('100%') 246 } 247} 248``` 249 250## TaskPool和Worker 251 252ArkTS提供了TaskPool和Worker两种并发能力供开发者选择,各自的运作机制和注意事项请见[TaskPool简介](taskpool-introduction.md)和[Worker简介](worker-introduction.md),两者之间实现的特点和适用场景也存在差异,请见[TaskPool和Worker的对比](taskpool-vs-worker.md)。