1# 合理使用多线程共享内存 2<!--Kit: Common--> 3<!--Subsystem: Demo&Sample--> 4<!--Owner: @mgy917--> 5<!--Designer: @jiangwensai--> 6<!--Tester: @Lyuxin--> 7<!--Adviser: @huipeizi--> 8 9## 概述 10在应用开发中,为了避免主线程阻塞,提高应用性能,需要将一些耗时操作放在子线程中执行。此时,子线程就需要访问主线程中的数据。ArkTS采用了基于消息通信的Actor并发模型,具有内存隔离的特性,所以跨线程传输数据时需要将数据序列化,但是AkrTS支持通过可共享对象SharedArrayBuffer实现直接的共享内存。 11 12在开发应用时,如果遇到数据量较大,并且需要多个线程同时操作的情况,推荐使用SharedArrayBuffer共享内存,可以减少数据在线程间传递时需要复制和序列化的额外开销。比如,音视频解码播放、多个线程同时读取写入文件等场景。由于内存是共享的,所以在多个线程同时操作同一块内存时,可能会引起数据的紊乱,这时就需要使用锁来确保数据操作的有序性。本文将基于此具体展开说明。关于多线程的使用和原理,可参考[OpenHarmony多线程能力场景化示例实践](./multi_thread_capability.md),本文将不再详细讲述。 13## 工作原理 14可共享对象SharedArrayBuffer,是拥有固定长度的原始二进制数据缓冲区,可以存储任何类型的数据,包括数字、字符串等。它支持在多线程之间传递,传递之后的SharedArrayBuffer对象和原始的SharedArrayBuffer对象可以指向同一块内存,进而达到共享内存的目的。SharedArrayBuffer对象存储的数据在子线程中被修改时,需要通过原子操作保证其同步性,即下个操作开始之前务必需要保证上个操作已经结束。下面将通过示例说明原子操作保证同步性的必要性,详细代码请参考[AtomicsUsage.ets](https://gitcode.com/openharmony/applications_app_samples/blob/master/code/Performance/PerformanceLibrary/feature/memoryShared/src/main/ets/pages/AtomicsUsage.ets)。 15### 非原子操作 16 17```javascript 18...... 19// 非原子操作,进行10000次++ 20@Concurrent 21function normalProcess(int32Array: Int32Array) { 22 for (let i = 0; i < 10000; i++) { 23 int32Array[0]++; 24 } 25} 26// 原子操作,进行10000次++ 27@Concurrent 28function atomicsProcess(int32Array: Int32Array) { 29 for (let i = 0; i < 10000; i++) { 30 Atomics.add(int32Array, 0, 1); 31 } 32} 33...... 34@State result: string = "计算结果:"; 35private taskNum: number = 2; 36private scroller: Scroller = new Scroller(); 37...... 38Button("非原子操作") 39 .width("80%") 40 .fontSize(30) 41 .fontWeight(FontWeight.Bold) 42 .margin({ top: 30 }) 43 .onClick(async () => { 44 this.sharedArrayBufferUsage(false); 45 }) 46Scroll(this.scroller) { 47 Column() { 48 Text(this.result) 49 .width("80%") 50 .fontSize(30) 51 .fontWeight(FontWeight.Bold) 52 .fontColor(Color.Blue) 53 } 54}.height("60%") 55.margin({ top: 30 }) 56...... 57// 根据传入的值isAtomics判断是否使用原子操作 58sharedArrayBufferUsage(isAtomics: boolean) { 59 // 创建长度为4的SharedArrayBuffer对象 60 let sab: SharedArrayBuffer = new SharedArrayBuffer(4); 61 // 由于SharedArrayBuffer是原始二进制数据缓冲区,无法直接使用,所以这里转换为Int32Array类型进行后续操作 62 let int32Array: Int32Array = new Int32Array(sab); 63 int32Array[0] = 0; 64 let taskGroup: taskpool.TaskGroup = new taskpool.TaskGroup(); 65 // 创建Task对象,并放入TaskGroup中执行 66 for (let i = 0; i < this.taskNum; i++) { 67 if (isAtomics) { 68 taskGroup.addTask(new taskpool.Task(atomicsProcess, int32Array)); 69 } else { 70 taskGroup.addTask(new taskpool.Task(normalProcess, int32Array)); 71 } 72 } 73 taskpool.execute(taskGroup).then(() => { 74 // 将结果打印在Text上 75 this.result = this.result + "\n" + int32Array; 76 // 如果Scroll不在最低端,则滑动到最低端 77 if (!this.scroller.isAtEnd()) { 78 this.scroller.scrollEdge(Edge.Bottom); 79 } 80 }).catch((e: BusinessError) => { 81 logger.error(e.message); 82 }) 83} 84 85``` 86在这段代码中,创建了2个task,对SharedArrayBuffer分别进行了10000次自增操作,预期的结果应该是20000。点击按钮查看计算结果,就会发现最后的结果并不一定是20000,并且每次点击后,计算的结果都可能是不同的。 87这是因为SharedArrayBuffer是共享内存的,多个线程同时进行自增时,是操作的同一块内存,而自增操作并不是原子操作,需要经过以下三个步骤: 88- 第一步,从内存中取值 89- 第二步,对取出的值+1 90- 第三步,将结果写入内存 91 92当多个线程同时操作时,就会发生这样一种情况:A线程在第一步取值1000,第二步+1操作后是1001,在执行第三步之前,B线程也去取值了,这时由于A线程还没有将结果写入内存,所以B线程取到的值依然是1000,然后A执行第三步将1001写入了内存,而B会对1000进行+1操作并将结果1001写入同一块内存。这样就会导致明明进行了两次+1的操作,但是结果并没有变成预期的1002,而是1001。所以在这个示例中会出现结果不符合预期的情况。 93### 原子操作 94下面修改一下代码,将自增操作改为使用Atomics.add()方法的原子操作。 95 96```javascript 97...... 98Button("原子操作") 99 .width("80%") 100 .fontSize(30) 101 .fontWeight(FontWeight.Bold) 102 .margin({ top: 30 }) 103 .onClick(async () => { 104 this.sharedArrayBufferUsage(true); 105 }) 106...... 107 108``` 109点击按钮查看计算结果,就会发现不论计算多少次,结果一直都是20000。这是因为,原子操作是不可中断的一个或者一系列操作,可以保证在A线程执行完取值、计算、写入内存这三个步骤之前,不会被B线程中断,也就不会发生非原子操作示例中B线程取到旧值的情况,而是每次都能拿到A线程写入内存的新值。所以,在使用SharedArrayBuffer共享内存时,一定要注意使用原子操作保证同步性,否则就可能会造成数据的紊乱。 110## 场景示例 111在应用开发中使用多线程时,会遇到处理复杂逻辑的情况,是无法保证整个线程都是一个原子操作的,此时就可以使用锁来解决一段代码的原子性问题。 112### 锁的实现 113并发编程重在解决线程间分工、同步与互斥的问题,而实现互斥的重要方式是通过锁。示例通过Atomics和SharedArrayBuffer简单实现不可重入锁类NonReentrantLock。 114constructor()通过传入可共享对象SharedArrayBuffer初始化锁,实现多线程共享同一块内存,以作为共同操作的标志位,从而控制锁的状态。 115 116```javascript 117const UNLOCKED = 0; 118const LOCKED_SINGLE = 1; 119const LOCKED_MULTI = 2; 120export class NonReentrantLock { 121 flag: Int32Array; 122 constructor(sab: SharedArrayBuffer) { // 传入一个4bytes的SharedArrayBuffer 123 this.flag= new Int32Array(sab); // 其视图为只有一位的int数组(1 = 4bytes * 8 / 32bit) 124 } 125 126 lock(): void {...} 127 tryLock(): boolean {...} 128 unlock(): void {...} 129} 130 131``` 132lock()方法用于获取锁,如果获取锁失败,则线程进入阻塞状态。 133 134```javascript 135lock(): void { 136 const flag= this.flag; 137 let c = UNLOCKED; 138 // 如果flag数组的0位置,当前值为UNLOCKED,则改为LOCKED_SINGLE;否则,进入do-while循环,阻塞线程 139 if ((c = Atomics.compareExchange(flag, 0, UNLOCKED, LOCKED_SINGLE)) !== UNLOCKED) { 140 do { 141 // 有线程拿不到锁时,修改标志位为LOCKED_MULTI,并使之进入睡眠阻塞状态 142 if (c === LOCKED_MULTI || Atomics.compareExchange(flag, 0, LOCKED_SINGLE, LOCKED_MULTI) !== UNLOCKED) { 143 Atomics.wait(flag, 0, LOCKED_MULTI); 144 } 145 // 被唤醒的线程,如果还是没有拿到锁,就回到循环中,重新进入阻塞状态 146 } while ((c = Atomics.compareExchange(flag, 0, UNLOCKED, LOCKED_MULTI)) !== UNLOCKED); 147 } 148} 149 150``` 151tryLock()方法用于尝试获取锁,如果获取锁成功则返回true,失败返回false,但不会阻塞线程。 152 153```javascript 154tryLock(): boolean { 155 const flag= this.flag; 156 return Atomics.compareExchange(flag, 0, UNLOCKED, LOCKED_SINGLE) === UNLOCKED; 157} 158 159``` 160unlock()方法用于释放锁。 161 162```javascript 163unlock(): void { 164 // 局部化flag,保证只有获取锁的线程可以释放锁 165 const flag= this.flag; 166 let v0 = Atomics.sub(flag, 0, 1); 167 if (v0 !== LOCKED_SINGLE) { 168 Atomics.store(flag, 0, UNLOCKED); 169 // 只唤醒在数组0索引位置等待的其中一个线程,去上方lock()方法while条件中检测,尝试获取锁 170 Atomics.notify(flag, 0, 1); 171 } 172} 173 174``` 175### 锁的应用 176示例通过多线程写入文件的场景,展示多线程不合理操作共享内存时,出现的线程不安全问题,进而导致输出文件乱码的情况。并通过使用上文实现的NonReentrantLock,解决该问题。 177主线程通过startWrite(useLock: boolean)方法,开启多线程写入文件,并通过useLock参数控制是否使用锁。 178 179```javascript 180@Component 181export struct LockUsage { 182 taskNum: number = 10; // 任务数,实际并行线程数依设备而定 183 baseDir: string = (this.getUIContext().getHostContext() as Context).filesDir + '/TextDir'; // 文件写入的应用沙箱路径 184 sabInLock: SharedArrayBuffer = new SharedArrayBuffer(4); // 在主线程,初始化子线程锁标志位,所使用的共享内存 185 sabForLine: SharedArrayBuffer = new SharedArrayBuffer(4); // 在主线程,初始化子线程偏移位,所使用的共享内存 186 @State result: string = ""; 187 build() { 188 Row() { 189 Column() { 190 // 不使用锁写入的按钮 191 Button($r('app.string.not_use_lock')) 192 .width("80%").fontSize(30) 193 .fontWeight(FontWeight.Bold) 194 .margin({ top: 30 }) 195 .onClick(async () => { 196 this.startWrite(false); 197 }) 198 // 使用锁写入的按钮 199 Button($r('app.string.use_lock')) 200 .width("80%") 201 .fontSize(30) 202 .fontWeight(FontWeight.Bold) 203 .margin({ top: 30 }) 204 .onClick(async () => { 205 this.startWrite(true); 206 }) 207 // 运行状态说明 208 Text(this.result) 209 .width("80%") 210 .fontSize(30) 211 .fontWeight(FontWeight.Bold) 212 .fontColor(Color.Blue) 213 .margin({ top: 30 }) 214 } 215 .width('100%') 216 } 217 .height('100%') 218 } 219 startWrite(useLock: boolean): void { 220 // 指明运行状态为“写入文件开始” 221 this.result = (this.getUIContext() 222 .getHostContext() as Context).resourceManager.getStringSync($r('app.string.write_file_start').id); 223 // 初始化写入时的偏移量 224 let whichLineToWrite: Int32Array = new Int32Array(this.sabForLine); 225 Atomics.store(whichLineToWrite, 0, 0); 226 // 开启多线程依据偏移量指定位置写入文件 227 // 通过主线程的sabInLock:SharedArrayBuffer初始化锁,保证多线程操作同一处锁标志位 228 // 通过主线程的sabForLine:SharedArrayBuffer初始化偏移位,保证多线程操作同一处偏移位置 229 let taskPoolGroup: taskpool.TaskGroup = new taskpool.TaskGroup(); 230 for (let i: number = 0; i < this.taskNum; i++) { 231 taskPoolGroup.addTask(new taskpool.Task(createWriteTask, this.baseDir, i, this.sabInLock, this.sabForLine, useLock)); 232 } 233 taskpool.execute(taskPoolGroup).then(() => { 234 // 指明运行状态为“写入文件成功” 235 this.result = (this.getUIContext() 236 .getHostContext() as Context).resourceManager.getStringSync($r('app.string.write_file_success').id); 237 }).catch(() => { 238 // 指明运行状态为“写入文件失败” 239 this.result = (this.getUIContext() 240 .getHostContext() as Context).resourceManager.getStringSync($r('app.string.write_file_failed').id); 241 }) 242 } 243} 244 245``` 246子线程根据偏移量在指定位置写入文件,并通过偏移量自增,指定下次的写入位置。 247 248```javascript 249@Concurrent 250async function createWriteTask(baseDir: string, writeText: number, sabInLock: SharedArrayBuffer, sabForLine: SharedArrayBuffer, useLock: boolean): Promise<void> { 251 class Option { // 写入文件时的接口方法参数类 252 offset: number = 0; 253 length: number = 0; 254 encoding: string = 'utf-8'; 255 256 constructor(offset: number, length: number) { 257 this.offset = offset; 258 this.length = length; 259 } 260 } 261 // 初始化输出文件目录 262 let filePath: string | undefined = undefined; 263 filePath = baseDir + useLock ? "/useLock.txt" : "/unusedLock.txt"; 264 if (!fs.accessSync(baseDir)) { 265 fs.mkdirSync(baseDir); 266 } 267 // 利用主线程传入的SharedArrayBuffer初始化锁 268 let nrl: NonReentrantLock | undefined = undefined; 269 if (useLock) { 270 nrl = new NonReentrantLock(sabInLock); 271 } 272 // 利用主线程传入的SharedArrayBuffer初始化写入文件时的偏移量 273 let whichLineToWrite: Int32Array = new Int32Array(sabForLine); 274 let str: string = writeText + '\n'; 275 for (let i: number = 0; i < 100; i++) { 276 // 获取锁 277 if (useLock && nrl !== undefined) { 278 nrl.lock(); 279 } 280 // 写入文件 281 let file: fs.File = fs.openSync(filePath, fs.OpenMode.READ_WRITE | fs.OpenMode.CREATE); 282 try { 283 fs.writeSync(file.fd, str, new Option(whichLineToWrite[0], str.length)); 284 } catch (err) { 285 logger.error(`errorCode : ${err.code},errMessage : ${err.message}`); 286 } 287 fs.closeSync(file); 288 // 修改偏移量,指定下次写入时的位置 289 whichLineToWrite[0] += str.length; 290 // 释放锁 291 if (useLock && nrl !== undefined) { 292 nrl.unlock(); 293 } 294 } 295} 296 297``` 298从应用沙箱地址查看写入的文件,可以看到unusedLock.txt文件,所写行数不足1000行,且存在乱码,如图1所示。 299 300图1 不使用锁写入的文件 301 302 303 304而usedLock.txt文件,所写行数刚好1000行,且不存在乱码,如图2所示。 305 306图2 使用锁写入的文件 307 308 309 310## 总结 311综上所述,虽然使用了基于消息通信的Actor并发模型,但是ArkTS依旧支持通过共享内存的方式进行线程间通信。同时,在使用SharedArrayBuffer进行共享内存时,也需要通过原子操作或者锁来解决线程间同步与互斥的问题。合理使用多线程共享内存,才能在保证线程安全的前提下,提升应用的性能。