1# 多线程并发概述 2<!--Kit: ArkTS--> 3<!--Subsystem: CommonLibrary--> 4<!--Owner: @wang_zhaoyong--> 5<!--Designer: @weng-changcheng--> 6<!--Tester: @kirl75; @zsw_zhushiwei--> 7<!--Adviser: @ge-yafang--> 8 9多线程并发是指在单个程序中同时运行多个线程,通过并行或交替执行任务来提升性能和资源利用率的编程模型。在ArkTS应用开发中,多线程并发适用于多种业务场景,常见的业务场景主要分为以下三类,更详细的使用请参考[多线程开发实践案例](batch-database-operations-guide.md)。 10 11- 业务逻辑包含大量计算或频繁的I/O读写等需要长时间执行的任务,例如图片和视频的编解码、文件的压缩与解压缩、数据库操作等场景。 12 13- 业务逻辑包括监听和定期采集数据等需要长时间保持运行的任务,例如定期采集传感器数据的场景。 14 15- 业务逻辑跟随主线程的生命周期,或与主线程绑定的任务,例如在游戏中的业务场景。 16 17 18并发模型用于实现不同应用场景中的并发任务。常见的并发模型有基于内存共享的模型和基于消息通信的模型。 19 20Actor并发模型是基于消息通信的典型并发模型。开发者无需处理锁带来的复杂问题,且具备高并发度,因此应用广泛。 21 22当前ArkTS提供了TaskPool和Worker两种并发能力,两者均基于Actor并发模型实现。 23 24Actor并发模型和内存共享并发模型的具体对比请见[多线程并发模型](#多线程并发模型)。 25 26## 多线程并发模型 27 28内存共享并发模型指多线程同时执行任务,这些线程依赖同一内存资源并且都有权限访问,线程访问内存前需要抢占并锁定内存的使用权,没有抢占到内存的线程需要等待其他线程释放使用权再执行。 29 30Actor并发模型每一个线程都是一个独立Actor,每个Actor有自己独立的内存,Actor之间通过消息传递机制触发对方Actor的行为,不同Actor之间不能直接访问对方的内存空间。 31 32Actor并发模型与内存共享并发模型相比,不同线程间的内存是隔离的,因此不会发生线程竞争同一内存资源的情况。无需处理内存上锁问题,从而提高开发效率。 33 34Actor并发模型中,不同Actor之间不共享内存,需通过消息传递机制传递任务和结果。 35 36本文以经典的生产者消费者问题为例,分析这两种模型在解决问题时的差异。 37 38### 内存共享模型 39 40以下示例伪代码和示意图展示了如何使用内存共享模型解决生产者消费者问题。 41 42 43 44为了避免不同生产者或消费者同时访问同一块共享内存容器时产生脏读、脏写现象,同一时间只能有一个生产者或消费者访问该容器。即不同生产者和消费者需争夺使用容器的锁。当一个角色获取锁后,其他角色需等待该角色释放锁,才能重新尝试获取锁以访问该容器。 45 46```ts 47// 此段示例为伪代码仅作为逻辑示意,便于开发者理解使用内存共享模型和Actor模型的区别 48class Queue { 49 // ... 50 push(value: number) { 51 // ... 52 } 53 54 empty(): boolean { 55 // ... 56 return true; 57 } 58 59 pop(value: number): number { 60 // ... 61 return value; 62 } 63 // ... 64} 65 66class Mutex { 67 // ... 68 lock(): boolean { 69 // ... 70 return true; 71 } 72 73 unlock() { 74 // ... 75 } 76 // ... 77} 78 79class BufferQueue { 80 queue: Queue = new Queue(); 81 mutex: Mutex = new Mutex(); 82 83 add(value: number) { 84 // 尝试获取锁 85 if (this.mutex.lock()) { 86 this.queue.push(value); 87 this.mutex.unlock(); 88 } 89 } 90 91 take(value: number): number { 92 let res: number = 0; 93 // 尝试获取锁 94 if (this.mutex.lock()) { 95 if (this.queue.empty()) { 96 res = 1; 97 return res; 98 } 99 let num: number = this.queue.pop(value); 100 this.mutex.unlock(); 101 res = num; 102 } 103 return res; 104 } 105} 106 107// 构造一段全局共享的内存 108let g_bufferQueue = new BufferQueue(); 109 110class Producer { 111 constructor() { 112 } 113 114 run() { 115 let value = Math.random(); 116 // 跨线程访问bufferQueue对象 117 g_bufferQueue.add(value); 118 } 119} 120 121class ConsumerTest { 122 constructor() { 123 } 124 125 run() { 126 // 跨线程访问bufferQueue对象 127 let num = 123; 128 let res = g_bufferQueue.take(num); 129 if (res != null) { 130 // 添加消费逻辑 131 } 132 } 133} 134 135function Main(): void { 136 let consumer: ConsumerTest = new ConsumerTest(); 137 let producer: Producer = new Producer(); 138 let threadNum: number = 10; 139 for (let i = 0; i < threadNum; i++) { 140 // 如下伪代码模拟启动多线程执行生产任务 141 // let thread = new Thread(); 142 // thread.run(producer.run()); 143 // consumer.run(); 144 } 145} 146``` 147 148 149### Actor模型 150 151以下示例简单展示了如何使用基于Actor模型的TaskPool并发能力来解决生产者消费者问题。 152 153 154 155Actor模型中,不同角色之间并不共享内存,生产者线程和UI线程都有自己的虚拟机实例,两个虚拟机实例之间拥有独占的内存,相互隔离。生产者生产出结果后,通过序列化通信将结果发送给UI线程。UI线程消费结果后,再发送新的生产任务给生产者线程。 156 157```ts 158import { taskpool } from '@kit.ArkTS'; 159 160// 跨线程并发任务 161@Concurrent 162async function produce(): Promise<number> { 163 // 添加生产相关逻辑 164 console.info('producing...'); 165 return Math.random(); 166} 167 168class Consumer { 169 public consume(value: Object) { 170 // 添加消费相关逻辑 171 console.info('consuming value: ' + value); 172 } 173} 174 175@Entry 176@Component 177struct Index { 178 @State message: string = 'Hello World'; 179 180 build() { 181 Row() { 182 Column() { 183 Text(this.message) 184 .fontSize(50) 185 .fontWeight(FontWeight.Bold) 186 Button() { 187 Text('start') 188 }.onClick(() => { 189 let produceTask: taskpool.Task = new taskpool.Task(produce); 190 let consumer: Consumer = new Consumer(); 191 for (let index: number = 0; index < 10; index++) { 192 // 执行生产异步并发任务 193 taskpool.execute(produceTask).then((res: Object) => { 194 consumer.consume(res); 195 }).catch((e: Error) => { 196 console.error(e.message); 197 }) 198 } 199 }) 200 .width('20%') 201 .height('20%') 202 } 203 .width('100%') 204 } 205 .height('100%') 206 } 207} 208``` 209<!-- @[actor_model](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/DocsSample/ArkTS/ArkTsConcurrent/MultithreadedConcurrency/MultiThreadConcurrencyOverview/entry/src/main/ets/pages/Index.ets) --> 210 211也可以等待生产者完成所有任务,通过序列化通信将结果发送给UI线程。UI线程接收后,由消费者统一消费结果。 212 213```ts 214import { taskpool } from '@kit.ArkTS'; 215 216// 跨线程并发任务 217@Concurrent 218async function produce(): Promise<number> { 219 // 添加生产相关逻辑 220 console.info('producing...'); 221 return Math.random(); 222} 223 224class Consumer { 225 public consume(value: number) { 226 // 添加消费相关逻辑 227 console.info('consuming value: ' + value); 228 } 229} 230 231@Entry 232@Component 233struct Index { 234 @State message: string = 'Hello World' 235 236 build() { 237 Row() { 238 Column() { 239 Text(this.message) 240 .fontSize(50) 241 .fontWeight(FontWeight.Bold) 242 Button() { 243 Text('start') 244 }.onClick(async () => { 245 let dataArray = new Array<number>(); 246 let produceTask: taskpool.Task = new taskpool.Task(produce); 247 let consumer: Consumer = new Consumer(); 248 for (let index: number = 0; index < 10; index++) { 249 // 执行生产异步并发任务 250 let result = await taskpool.execute(produceTask) as number; 251 dataArray.push(result); 252 } 253 for (let index: number = 0; index < dataArray.length; index++) { 254 consumer.consume(dataArray[index]); 255 } 256 }) 257 .width('20%') 258 .height('20%') 259 } 260 .width('100%') 261 } 262 .height('100%') 263 } 264} 265``` 266 267## TaskPool和Worker 268 269ArkTS提供了TaskPool和Worker两种并发能力供开发者选择,各自的运作机制和注意事项请见[TaskPool简介](taskpool-introduction.md)和[Worker简介](worker-introduction.md),两者之间实现的特点和适用场景也存在差异,请见[TaskPool和Worker的对比](taskpool-vs-worker.md)。 270 271## 并发注意事项 272 273- 避免在并发线程中操作UI 274 275 UI操作必须在主线程中执行。并发线程中操作UI可能导致界面异常或崩溃。 276 277- 数据传递需支持序列化/反序列化 278 279 并发任务间传递数据时,对象必须是可序列化的(如基本类型、普通对象等),不可传递函数、循环引用、特殊对象(如Promise、Error)等。已完成(fulfilled或rejected)状态的 Promise可以被传递,因为其结果是可序列化的。 280 281- 合理控制并发粒度 282 283 频繁创建和销毁并发任务(如Worker、Task)会带来额外性能开销,建议复用或使用任务池机制。 284 285- 注意内存泄漏风险 286 287 避免在并发任务中持有外部对象的强引用,防止内存泄漏。 288 289- 并发任务应具备独立性 290 291 并发任务应尽量不依赖外部状态,减少竞态条件(Race Condition)和同步开销。竞态条件是指多个线程或任务同时访问并修改共享数据,执行结果依赖于任务调度的顺序,可能导致数据不一致或不可预期的行为。