• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# 多线程能力场景化示例实践
2<!--Kit: Common-->
3<!--Subsystem: Demo&Sample-->
4<!--Owner: @mgy917-->
5<!--Designer: @jiangwensai-->
6<!--Tester: @Lyuxin-->
7<!--Adviser: @huipeizi-->
8
9## 简介
10
11应用中的每个[进程](../application-models/process-model-stage.md)都会有一个主线程,主线程主要承担执行UI绘制操作、管理ArkTS引擎实例的创建和销毁、分发和处理事件、管理Ability生命周期等职责,具体可参见[线程模型概述](../application-models/thread-model-stage.md)。因此,开发应用时应当尽量避免将耗时的操作放在主线程中执行。ArkTS提供了**Worker**和**TaskPool**两种多线程并发能力,多线程并发允许在同一时间段内同时执行多段代码,这两个并发的基本能力可参见[TaskPool和Worker的对比](../arkts-utils/taskpool-vs-worker.md)。
12
13在介绍**Worker**和**TaskPool**的详细使用方法前,我们先简单介绍并发模型的相关概念,以便于大家的理解。
14
15## 并发模型概述
16
17并发的意思是多个任务同时执行。并发模型分为两大类:基于内存共享的并发模型和基于消息传递的并发模型。
18
19在基于内存共享的并发模型中,并发线程通过读写内存中的共享对象来进行交互。基于共享内存的并发编程需要满足三条性质:
20
21- 原子性:指一个操作是不可中断的,要么全部执行成功要么全部执行失败。
22
23- 有序性:指程序执行的顺序必须符合预期,不能出现乱序的情况。
24
25- 可见性:指当一个线程修改了共享变量后,其他线程能够立即得知这个修改。
26
27现代程序语言一般通过锁、内存屏障、原子指令来满足这三条性质。基于内存共享的并发模型与底层硬件接近,在能正确撰写并发代码的情况下,可以最大发挥底层硬件性能,实现性能优秀的多线程程序。但是这种并发模型难以掌握,即使资深的程序员也非常容易犯错。典型的基于内存共享并发模型的程序语言有C++ 、Swift和Java等。
28
29在基于消息传递的并发模型中,并发线程的内存相互隔离,需要通过通信通道相互发送消息来进行交互。典型的基于消息传递的并发模型一般有两种:CSP和Actor。
30
31CSP(Communicating Sequential Processes,通信顺序进程)中的计算单元并不能直接互相发送信息。需要通过通道(Channel)作为媒介进行消息传递:发送方需要将消息发送到Channel,而接收方需要从Channel读取消息。与CSP不同,在Actor模型中,每个Actor可以看做一个独立的计算单元,并且相互之间内存隔离,每个Actor中存在信箱(Mail Box),Actor之间可以直接进行消息传递,如下图所示:
32
33**图1**  Actor消息传递示意图
34
35![Actor消息传递图](figures/actor-message-post.png)
36
37CSP与Actor之间的主要区别:
38
39- Actor需要明确指定消息接收方,而CSP中处理单元不用关心这些,只需要把消息发送给Channel,而接收方只需要从Channel读取消息。
40
41- 由于在默认情况下Channel是没有缓存的,因此对Channel的发送(Send)动作是同步阻塞的,直到另外一个持有该Channel引用的执行块取出消息,而Actor模型中信箱本质是队列,因此消息的发送和接收可以是异步的。
42
43典型的基于消息传递的并发模型的程序语言有:Dart、JS和ArkTS。当前系统中Worker和TaskPool都是基于Actor并发模型实现的并发能力。
44
45## Worker
46
47### 基本概念和运作原理
48
49当前系统中的Worker是一个独立的线程,基本概念可参见[TaskPool和Worker的对比](../arkts-utils/taskpool-vs-worker.md)。Worker拥有独立的运行环境,每个Worker线程和主线程一样拥有自己的内存空间、消息队列(MessageQueue)、事件轮询机制(EventLoop)、调用栈(CallStack)等。线程之间通过消息(Massage)进行交互,如下图所示:
50
51**图2**  线程交互示意图
52
53![线程交互图](figures/worker-thread-message.png)
54
55在多核的情况下(下图中的CPU 1和CPU 2同时工作),多个Worker线程(下图中的worker thread1和worker thread2)可以同时执行,因此Worker线程做到了真正的并发,如下图所示:
56
57**图3**  Worker线程并发示意图
58
59![Worker线程并发图](figures/worker-thread-concurrent.png)
60
61### 使用场景和开发示例
62
63对于Worker,有以下适用场景:
64
65- 运行时间超过3分钟的任务,需要使用Worker。
66
67- 有关联的一系列同步任务,例如数据库增、删、改、查等,要保证同一个句柄,需要使用Worker。
68
69以视频解压的场景为例,点击右上角下载按钮,该示例会执行网络下载并监听,下载完成后自动执行解压操作。当视频过大时,可能会出现解压时长超过3分钟耗时的情况,因此我们选用该场景来说明如何使用Worker。
70
71场景预览图如下所示:
72
73**图4**  场景预览图
74
75![场景预览图](figures/video-preview.jpg)
76
77使用步骤如下:
78
791. 宿主线程创建一个Worker线程。通过`new worker.ThreadWorker()`创建Worker实例,示例代码如下:
80
81   ```typescript
82   // 引入worker模块
83   import { MessageEvents, worker } from '@kit.ArkTS';
84   import { common } from '@kit.AbilityKit';
85
86   let workerInstance: worker.ThreadWorker = new worker.ThreadWorker('entry/ets/pages/workers/worker.ts', {
87     name: 'FriendsMoments Worker'
88   });
89   ```
90
912. 宿主线程给Worker线程发送任务消息。宿主线程通过postMessage方法来发送消息给Worker线程,启动下载解压任务,示例代码如下:
92
93   ```typescript
94   // 请求网络数据
95   let context: common.UIAbilityContext = this.getUIContext().getHostContext() as common.UIAbilityContext;
96   // 参数中mediaData和isImageData是根据开发者自己的业务需求添加的,其中mediaData为数据路径、isImageData为判断图片或视频的标识
97   workerInstance.postMessage({ context, mediaData: this.mediaData, isImageData: this.isImageData });
98   ```
99
1003. Worker线程监听宿主线程发送的消息。Worker线程在onmessage中接收到宿主线程的postMessage请求,执行下载解压任务,示例代码如下:
101
102   ```typescript
103   // 引入worker模块
104   import { MessageEvents, worker } from '@kit.ArkTS';
105
106   let workerPort = worker.workerPort;
107   // 接收宿主线程的postMessage请求
108   workerPort.onmessage = (e: MessageEvents): void => {
109     // 下载视频文件
110     let context: common.UIAbilityContext = e.data.context;
111     let filesDir: string = context.filesDir;
112     let time: number = new Date().getTime();
113     let inFilePath: string = `${filesDir}/${time.toString()}.zip`;
114     let mediaDataUrl: string = e.data.mediaData;
115     let urlPart: string = mediaDataUrl.split('.')[1];
116     let length: number = urlPart.split('/').length;
117     let fileName: string = urlPart.split('/')[length-1];
118     let options: zlib.Options = {
119       level: zlib.CompressLevel.COMPRESS_LEVEL_DEFAULT_COMPRESSION
120     };
121     request.downloadFile(context, {
122       url: mediaDataUrl,
123       filePath: inFilePath
124     }).then((downloadTask) => {
125       downloadTask.on('progress', (receivedSize: number, totalSize: number) => {
126         Logger.info(`receivedSize:${receivedSize},totalSize:${totalSize}`);
127       });
128       downloadTask.on('complete', () => {
129         // 下载完成之后执行解压操作
130         zlib.decompressFile(inFilePath, filesDir, options, (errData: BusinessError) => {
131           if (errData !== null) {
132             // ...
133             // 异常处理
134           }
135           let videoPath: string = `${filesDir}/${fileName}/${fileName}.mp4`;
136           workerPort.postMessage({ 'isComplete': true, 'filePath': videoPath });
137         })
138       });
139       downloadTask.on('fail', () => {
140         // ...
141         // 异常处理
142       });
143     }).catch((err) => {
144       // ...
145       // 异常处理
146     });
147   };
148   ```
149
1504. 宿主线程监听Worker线程发送的信息。宿主线程通过onmessage接收到Worker线程发送的消息,并执行下载的结果通知。
151
1525. 释放Worker资源。在业务完成或者页面销毁时,调用workerPort.close()接口主动释放Worker资源,示例代码如下所示:
153
154   ```typescript
155   workerInstance.onmessage = (e: MessageEvents): void => {
156     if (e.data) {
157       this.downComplete = e.data['isComplete'];
158       this.filePath = e.data['filePath'];
159       workerInstance.terminate();
160       setTimeout(() => {
161         this.downloadStatus = false;
162       }, LOADING_DURATION_OPEN);
163     }
164   };
165   ```
166
167## TaskPool
168
169### 基本概念和运作原理
170
171相比使用Worker实现多线程并发,TaskPool更加易于使用,创建开销也少于Worker,并且Worker线程有个数限制,需要开发者自己掌握,TaskPool的基本概念可参见[TaskPool和Worker的对比](../arkts-utils/taskpool-vs-worker.md)。TaskPool作用是为应用程序提供一个多线程的运行环境。TaskPool在Worker之上实现了调度器和Worker线程池,TaskPool根据任务的优先级,将其放入不同的优先级队列,调度器会依据自己实现的调度算法(优先级,防饥饿),从优先级队列中取出任务,放入TaskPool中的Worker线程池,执行相关任务,流程图如下所示:
172
173**图5**  TaskPool流程示意图
174
175![TaskPool流程图](figures/taskpool-process.png)
176
177TaskPool有如下的特点:
178
179- 轻量化的并行机制。
180
181- 降低整体资源的消耗。
182
183- 提高系统的整体性能。
184
185- 无需关心线程实例的生命周期。
186
187- 可以使用TaskPool API创建后台任务(Task),并对所创建的任务进行如任务执行、任务取消的操作。
188
189- 根据任务负载动态调节TaskPool工作线程的数量,以使任务按照预期时间完成任务。
190
191- 可以设置任务的优先级。
192
193- 可以设置任务组(TaskGroup)将任务关联起来。
194
195### 使用场景和开发示例
196
197TaskPool的适用场景主要分为如下三类:
198
199- 需要设置优先级的任务。
200
201- 需要频繁取消的任务。
202
203- 大量或者调度点较分散的任务。
204
205因为朋友圈场景存在不同好友同时上传视频图片,在频繁滑动时将多次触发下载任务,所以下面将以使用朋友圈加载网络数据并且进行解析和数据处理的场景为例,来演示如何使用TaskPool进行大量或调度点较分散的任务开发和处理。场景的预览图如下所示:
206
207**图6**  朋友圈场景预览图
208
209![朋友圈场景预览图](figures/friendmoment-preview.jpg)
210
211使用步骤如下:
212
2131. 首先import引入TaskPool模块,TaskPool的API介绍可参见[@ohos.taskpool(启动TaskPool)](../reference/apis-arkts/js-apis-taskpool.md)。
214
215   ```typescript
216   import { taskpool } from '@kit.ArkTS';
217   ```
218
2192. new一个task对象,其中传入被调用的方法和参数。
220
221   ```typescript
222   // ...
223   // 创建task任务项,参数1.任务执行需要传入函数 参数2.任务执行传入函数的参数 (本示例中此参数为被调用的网络地址字符串)
224   let task: taskpool.Task = new taskpool.Task(getWebData, jsonUrl);
225   // ...
226
227   // 获取网络数据
228   @Concurrent
229   async function getWebData(url: string): Promise<Array<FriendMoment>> {
230     try {
231       let webData: http.HttpResponse = await http.createHttp().request(
232         url,
233         { header: {
234             'Content-Type': 'application/json'
235         },
236           connectTimeout: 60000, readTimeout: 60000
237         });
238       if (typeof (webData.result) === 'string') {
239         // 解析json字符串
240         let jsonObj: Array<FriendMoment> = await JSON.parse(webData.result).FriendMoment;
241         let friendMomentBuckets: Array<FriendMoment> = new Array<FriendMoment>();
242         // 下方源码省略,主要为数据解析和耗时操作处理
243         // ...
244         return friendMomentBuckets;
245       } else {
246         // 异常处理
247         // ...
248       }
249     } catch (err) {
250       // 异常处理
251       // ...
252     }
253   }
254   ```
255
2563. 之后使用taskpool.execute执行TaskPool任务,将待执行的函数放入TaskPool内部任务队列等待执行。execute需要两个参数:创建的任务对象、等待执行的任务组的优先级,默认值是Priority.MEDIUM。在TaskPool中执行完数据下载、解析和处理后,再返回给主线程中。
257
258   ```typescript
259   let friendMomentArray: Array<FriendMoment> = await taskpool.execute(task, taskpool.Priority.MEDIUM) as Array<FriendMoment>;
260   ```
261
2624. 将新获取的momentData通过AppStorage.setOrCreate传入页面组件中。
263
264   ```typescript
265   // 获取页面组件中的momentData对象,其中是组件所需的username、image、video等数据
266   let momentData = AppStorage.get<FriendMomentsData>('momentData');
267   // 循环遍历对象并依次传入momentData
268   for (let i = 0; i < friendMomentArray.length; i++) {
269     momentData.pushData(friendMomentArray[i]);
270   }
271   // 将更新的momentData返回给页面组件
272   AppStorage.setOrCreate('momentData', momentData);
273   ```
274
275## 其他场景示例和方案思考
276
277在日常开发过程中,我们还会碰到一些其他并发场景问题,下面我们介绍了常用并发场景的示例方案推荐。
278
279### Worker线程调用主线程类型的方法
280
281我们在主线程中创建了一个对象,假如类型为MyMath,我们需要把这个对象传递到Worker线程中,然后在Worker线程中执行该类型中的一些耗时操作方法,比如Math中的compute方法,类结构示例代码如下:
282
283```typescript
284class MyMath {
285  a: number = 0;
286  b: number = 1;
287
288  constructor(a: number, b: number) {
289    this.a = a;
290    this.b = b;
291  }
292
293  compute(): number {
294    return this.a + this.b;
295  }
296}
297```
298
299主线程代码:
300
301```typescript
302private math: MyMath = new MyMath(2, 3); // 初始化a和b的值为2和3
303private workerInstance: worker.ThreadWorker;
304
305this.workerInstance = new worker.ThreadWorker("entry/ets/worker/MyWorker.ts");
306this.workerInstance.postMessage(this.math); // 发送到Worker线程中,期望执行MyMath中的compute方法,预期值是2+3=5
307```
308
309MyMath对象在进行线程传递后,会丢失类中的方法属性,导致我们只是在Worker线程中可以获取到MyMath的数据,但是无法在子系统中直接调用MyMath的compute方法,示意代码如下:
310
311```typescript
312const workerPort = worker.workerPort;
313workerPort.onmessage = (e: MessageEvents): void => {
314  let a = e.data.a;
315  let b = e.data.b;
316}
317```
318
319这种情况下我们可以怎么去实现在Worker线程中调用主线程中类的方法呢?
320
321首先,我们尝试使用强制转换的方式把Worker线程接收到数据强制转换成MyMath类型,示例代码如下:
322
323```typescript
324const workerPort = worker.workerPort;
325workerPort.onmessage = (e: MessageEvents): void => {
326  let math = e.data as MyMath; // 方法一:强制转换
327  console.log('math compute:' + math.compute()); // 执行失败,不会打印此日志
328}
329```
330
331强制转换后执行方法失败,不会打印此日志。因为序列化传输普通对象时,仅支持传递属性,不支持传递其原型及方法。接下来我们尝试第二种方法,根据数据重新初始化一个MyMath对象,然后执行compute方法,示例代码如下:
332
333```typescript
334const workerPort = worker.workerPort;
335workerPort.onmessage = (e: MessageEvents): void => {
336  // 重新构造原类型的对象
337  let math = new MyMath(0, 0);
338  math.a = e.data.a;
339  math.b = e.data.b;
340  console.log('math compute:' + math.compute()); // 成功打印出结果:5
341}
342```
343
344第二种方法成功在Worker线程中调用了MyMath的compute方法。但是这种方式还有弊端,比如每次使用到这个类进行传递,我们就得重新进行构造初始化,而且构造的代码会分散到工程的各处,很难进行维护,于是我们有了第三种改进方案。
345
346第三种方法,我们需要构造一个接口类,包含了我们需要线程间调用的基础方法,这个接口类主要是管理和约束MyMath类的功能规格,保证MyMath类和它的代理类MyMathProxy类在主线程和子线程的功能一致性,示例代码如下:
347
348```typescript
349interface MyMathInterface {
350  compute():number;
351}
352```
353
354然后,我们把MyMath类继承这个方法,并且额外构造一个代理类,继承MyMath类,示例代码如下:
355
356```typescript
357class MyMath implements MyMathInterface {
358  a: number = 0;
359  b: number = 1;
360
361  constructor(a: number, b: number) {
362    console.log('MyMath constructor a:' + a + ' b:' + b);
363    this.a = a;
364    this.b = b;
365  }
366
367  compute(): number {
368    return this.a + this.b;
369  }
370}
371
372class MyMathProxy implements MyMathInterface {
373  private myMath: MyMath;
374  constructor(math: MyMath) {
375    this.myMath = new MyMath(math.a, math.b);
376  }
377  // 代理MyMath类的compute方法
378  compute(): number {
379    return this.myMath.compute();
380  }
381}
382```
383
384我们在主线程构造并且传递MyMath对象后,在Worker线程中转换成MyMathProxy,即可调用到MyMath的compute方法了,并且无需在多处进行初始化构造,只要把构造逻辑放到MyMathProxy或者MyMath的构造函数中,Worker线程中的示例代码如下:
385
386```typescript
387const workerPort = worker.workerPort;
388workerPort.onmessage = (e: MessageEvents): void => {
389  // 方法三:使用代理类构造对象
390  let proxy = new MyMathProxy(e.data);
391  console.log('math compute:' + proxy.compute()); // 成功打印出结果:5
392}
393```
394
395大家可以根据实际场景选择第二种或者第三种方案。
396
397### 在TaskPool线程操作关系型数据库
398
399**场景问题**
400
401使用移动设备时,核心应用界面(如信息流、历史记录、项目列表)滑动操作频繁出现滞后与卡顿,影响功能访问与操作效率,降低用户体验。滑动性能下降源于潜在的后台耗时任务、资源管理问题等,导致界面帧率下降与反馈失真。需要对系统或应用进行排查优化。
402
403**原因分析**
404
405在应用程序运行过程中,由于主线程执行数据库查询操作耗时过长,且进行多次此类查询,导致主线程负担加重,无法及时完成界面渲染任务。由此引发的后果是,界面的更新与展示严重受阻,出现明显的卡顿现象,严重影响了用户交互体验与应用程序的整体性能表现。
406
407如下图所示,在对3000条数据进行查询的过程中,整个任务总耗时接近8秒。其中,“getListFromResultSet”函数负责查询结果数据格式化,其执行时间超过1秒;而用于检测数据库元素是否存在重复的“includes”方法,其运行时间超过5秒,这两项操作的显著耗时,成为导致滑动操作卡顿的关键因素。
408
409![数据库查询耗时图](figures/multi_thread_capability_search_rdb.PNG)
410
411**解决方案**
412
413为解决主线程因执行全量联系人查询操作而导致的界面滑动卡顿和渲染阻塞问题,采取以下优化措施:
414
415- 将全量联系人的查询任务移交给TaskPool子线程处理,使其与主线程解耦,确保查询过程不影响主线程的界面更新工作;
416
417- 减少因过度监听事件而引发的不必要的重复数据库查询,从而降低系统资源消耗,提升数据查询效率。
418
419这两项举措旨在协同改善应用程序性能,确保界面的流畅渲染,提升用户使用体验。
420
421**代码实现**
422
423**1.TaskPool线程池实现**
424
425- TaskPool子线程执行数据库新增操作
426
427  函数`taskPoolExecuteInsert` 是异步函数,用于使用线程池 (`taskPool`) 执行联系人数据库的数据新增操作。函数接收 `common.Context` 类型的 `context` 和 `Contact` 类型的 `contact` 参数。函数内部创建一个 `taskPool.Task` 实例,封装插入操作(`insert` 函数)、上下文信息(`context`)和待插入的联系人数据(`contact`)。随后,函数使用 `await` 关键字调用 `taskPool.execute(task)` 异步执行插入任务。如果执行过程中出现异常,函数通过 `catch` 语句捕获异常并使用 `Logger.error` 记录日志。
428
429  ```ts
430
431  import { taskpool } from '@kit.ArkTS';
432  import { common } from '@kit.AbilityKit';
433  import { Contact } from '../constant/Contact';
434
435  /**
436   * 使用TaskPool执行联系人数据库相关的数据新增操作
437   */
438  export async function taskPoolExecuteInsert(context: common.Context, contact: Contact): Promise<void> {
439    try {
440      let task: taskPool.Task = new taskPool.Task(insert, context, contact); // insert函数调用 需使用装饰器@Concurrent
441      await taskPool.execute(task);
442    } catch (err) {
443      Logger.error(TAG, 'insert error:' + JSON.stringify(err));
444    }
445  }
446  ```
447
448- TaskPool子线程执行数据库批量新增操作
449
450  函数`taskPoolExecuteBatchInsert`为异步操作,利用`taskPool`执行批量添加联系人至数据库的任务。它接受`common.Context`类型的`context`参数和`Array<Contact>`类型的`array`参数。内部,通过创建`taskPool.Task`实例包括`batchInsert`执行函数、`context`及待插入的联系人数据`array`,并利用`await taskPool.execute(task)`异步执行此任务。如果执行过程中出现异常,则在`catch`块中捕获并使用`Logger.error`记录日志。
451
452  ```ts
453  import { taskpool } from '@kit.ArkTS';
454  import { common } from '@kit.AbilityKit';
455  import { Contact } from '../constant/Contact';
456
457  /**
458   * 使用TaskPool执行联系人数据库相关的数据批量新增操作
459   */
460  export async function taskPoolExecuteBatchInsert(context: common.Context, array: Array<Contact>): Promise<void> {
461    try {
462      let task: taskPool.Task = new taskPool.Task(batchInsert, context, array); // batchInsert函数调用 需使用装饰器@Concurrent
463      await taskPool.execute(task);
464    } catch (err) {
465      Logger.error(TAG, 'batch insert error:' + JSON.stringify(err));
466    }
467  }
468  ```
469
470- TaskPool子线程执行数据库查询操作
471
472  异步函数`taskPoolExecuteQuery`接收`common.Context`参数`context`,利用线程池 (`taskPool`)执行联系人数据库的查询操作。其过程包括:创建`taskPool.Task`实例(封装查询操作与传递上下文参数`context`),通过`taskPool.execute`执行该任务,将结果断言为`Array<Contact>`并返回。若执行时出现异常,记录错误日志并返回空的`Contact`数组。
473
474  ```ts
475  import { taskpool } from '@kit.ArkTS';
476  import { common } from '@kit.AbilityKit';
477  import { Contact } from '../constant/Contact';
478
479  /**
480   * 使用TaskPool执行联系人数据库相关的查询操作
481   */
482  export async function taskPoolExecuteQuery(context: common.Context): Promise<Array<Contact>> {
483    try {
484      let task: taskPool.Task = new taskPool.Task(query, context); // query函数调用 需使用装饰器@Concurrent
485      let result: Array<Contact> = await taskPool.execute(task) as Array<Contact>;
486      return result;
487    } catch (err) {
488      Logger.error(TAG, 'query error:' + JSON.stringify(err));
489      return [];
490    }
491  }
492  ```
493
494**2.relationalStore数据库实现**
495
496- relationalStore数据库新增方法
497
498  异步函数`insertData`用于执行数据库新增操作,接收`common.Context`参数`context`与`Contact`对象。执行流程如下:
499  1. 检查`context`是否为空或未定义,若满足条件则记录相应日志。
500
501  2. 检查`predicates`(返回带有和表名`TABLE_NAME`匹配的Rdb谓词)是否为空或未定义,若满足条件则记录相应日志。
502
503  3. 若`this.rdbStore`未初始化,则异步调用`initRdbStore(context)`进行初始化。
504
505  4. 提取`Contact`对象中的各项属性(name、gender、phone、remark、age),构建`ValuesBucket`对象。
506
507  5. 若`this.rdbStore`已定义,执行以下操作:
508
509     a. 使用`this.rdbStore.insert`方法向表`TABLE_NAME`插入数据,冲突解决策略为`ON_CONFLICT_REPLACE`。
510
511     b. 记录日志,显示插入操作完成及返回结果。
512
513  综上,该函数主要负责根据提供的`Contact`信息,确保数据库连接后向指定表中插入新数据,并在关键环节记录日志。
514
515  ```ts
516  import { relationalStore, ValuesBucket } from '@kit.ArkData';
517  import { common } from '@kit.AbilityKit';
518  import { Contact } from '../constant/Contact';
519
520  /**
521   * 数据库新增操作
522   */
523  public async insertData(context: common.Context,  Contact: Contact): Promise<void> {
524    Logger.info(TAG, 'insert begin');
525    if (!context) {
526      Logger.info(TAG, 'context is null or undefined');
527    }
528    if (predicates === null || predicates === undefined) {
529      Logger.info(TAG, 'predicates is null or undefined');
530    }
531    if (!this.rdbStore) {
532      await this.initRdbStore(context);
533    }
534    let value1 = Contact.name;
535    let value2 = Contact.gender;
536    let value3 = Contact.phone;
537    let value4 = Contact.remark;
538    let value5 = Contact.age;
539    const valueBucket: ValuesBucket = {
540      'name': value1,
541      'gender': value2,
542      'phone': value3,
543      'remark': value4,
544      'age': value5,
545    };
546    if (this.rdbStore != undefined) {
547      let ret = await this.rdbStore.insert(TABLE_NAME, valueBucket, relationalStore.ConflictResolution.ON_CONFLICT_REPLACE);
548      Logger.info(TAG, `insert done:${ret}`);
549    }
550  }
551  ```
552
553- relationalStore数据库批量新增方法
554
555  异步函数`batchInsertData`用于执行数据库批量新增操作,接收`common.Context`参数`context`与`Array<Contact>`类型的`array`参数。执行流程如下:
556  1. 检查`context`是否为空或未定义,若满足条件则记录相应日志。
557
558  2. 检查`predicates`(返回带有和表名`TABLE_NAME`匹配的Rdb谓词)是否为空或未定义,若满足条件则记录相应日志。
559
560  3. 若`this.rdbStore`未初始化,则异步调用`initRdbStore(context)`进行初始化。
561
562  4. 提取`Array<Contact>`数组中Contact对象中的各项属性(name、gender、phone、remark、age),构建`ValuesBucket`对象并添加到`ValuesBuckets` 数组当中。
563
564  5. 若`this.rdbStore`已定义,执行以下操作:
565
566     a. 使用`this.rdbStore.batchInsert`方法向表`TABLE_NAME`批量插入数据。
567
568     b. 记录日志,显示插入操作完成及返回结果。
569
570  综上,该函数主要负责根据提供的`Array<Contact>`信息,确保数据库连接后向指定表中批量插入新数据,并在关键环节记录日志。
571
572  ```ts
573  import { relationalStore, ValuesBucket } from '@kit.ArkData';
574  import { common } from '@kit.AbilityKit';
575  import { Contact } from '../constant/Contact';
576
577  /**
578     * 批量插入数据库
579     */
580    public async batchInsertData(context: common.Context, array: Array<Contact>): Promise<void> {
581      Logger.info(TAG, 'batch insert begin');
582      if (!context) {
583        Logger.info(TAG, 'context is null or undefined');
584      }
585
586      if (predicates === null || predicates === undefined) {
587        Logger.info(TAG, 'predicates is null or undefined');
588      }
589
590      if (!this.rdbStore) {
591        await this.initRdbStore(context);
592      }
593
594      let valueBuckets: Array<ValuesBucket> = [];
595      for (let index = 0; index < array.length; index++) {
596        let Contact = array[index] as Contact;
597        let value1 = Contact.name;
598        let value2 = Contact.gender;
599        let value3 = Contact.phone;
600        let value4 = Contact.remark;
601        let value5 = Contact.age;
602
603        const valueBucket: ValuesBucket = {
604          'name': value1,
605          'gender': value2,
606          'phone': value3,
607          'remark': value4,
608          'age': value5,
609        };
610        valueBuckets.push(valueBucket);
611      }
612
613      if (this.rdbStore != undefined) {
614        let ret = await this.rdbStore.batchInsert(TABLE_NAME, valueBuckets);
615        Logger.info(TAG, `batch insert done:${ret}`);
616      }
617    }
618  ```
619
620- relationalStore数据库查询方法
621
622  函数`query`负责执行数据库查询操作,接收`common.Context`参数`context`,返回Promise封装的`Contact`数组。执行流程如下:
623  1. 检查`context`是否为空或未定义,若满足条件则记录相应日志并直接返回空数组。
624
625  2. 检查`predicates`(返回带有和表名`TABLE_NAME`匹配的Rdb谓词)是否为空或未定义,若满足条件则记录相应日志并直接返回空数组。
626
627  3. 通过`relationalStore.getRdbStore(context, STORE_CONFIG)`获取或创建数据库连接(`rdbStore`)。若未成功获取到连接,则异步调用`initRdbStore(context)`进行初始化。
628
629  4. 若成功获取到`rdbStore`,执行以下操作:
630
631     a. 使用`this.rdbStore.query(predicates, this.columns)`方法根据给定的查询条件`predicates`与列名列表`this.columns`执行查询,获取`relationalStore.ResultSet`实例。
632
633     b. 调用`this.getListFromResultSet(resultSet)`方法处理查询结果集,将其转化为`Contact`数组并返回。
634
635  综上,该函数主要负责根据提供的`context`与查询条件`predicates`(若已定义),确保数据库连接后执行查询操作,处理结果并返回`Contact`数组。在关键环节记录日志,若出现输入参数问题或无法成功建立数据库连接,则返回空数组。
636
637  ```ts
638  import { relationalStore } from '@kit.ArkData';
639  import { common } from '@kit.AbilityKit';
640
641  /**
642   * 数据库查询操作
643   */
644  public async query(context: common.Context): Promise<Array<Contact>> {
645    Logger.info(TAG, 'query begin');
646    if (!context) {
647      Logger.info(TAG, 'context is null or undefined');
648      return [];
649    }
650    if (predicates === null || predicates === undefined) {
651      Logger.info(TAG, 'predicates is null or undefined');
652      return [];
653    }
654    this.rdbStore = await relationalStore.getRdbStore(context, STORE_CONFIG);
655    if (!this.rdbStore) {
656      await this.initRdbStore(context);
657    } else {
658      // 默认查询所有列
659      let resultSet: relationalStore.ResultSet = await this.rdbStore.query(predicates, this.columns);
660      Logger.info(TAG, 'result is ' + JSON.stringify(resultSet.rowCount));
661      // 处理查询到的结果数组
662      return this.getListFromResultSet(resultSet);
663    }
664    return [];
665  }
666  ```
667
668- relationalStore数据库查询结果处理方法
669
670  函数`getListFromResultSet`用于将给定的`relationalStore.ResultSet`对象解析为`Contact`数组。首先初始化空数组`contacts`,然后逐行遍历结果集。对于每一行,提取各列数据并构建一个`Contact`对象,通过检查数组中是否存在具有相同ID的联系人来防止重复添加。遍历结束后,关闭结果集以释放资源,最后返回包含所有联系人信息的数组。
671
672  ```ts
673  /**
674   * 处理数据格式
675   */
676  getListFromResultSet(resultSet: relationalStore.ResultSet): Array<Contact> {
677    // 声明结果变量
678    let contacts: Array<Contact> = [];
679    // 进入结果集的第一行
680    resultSet.goToFirstRow();
681    // 如果没有结束就继续遍历
682    while (!resultSet.isEnded) {
683      // 读取各个属性,初始化临时变量contact
684      let contact: Contact = {
685        'id': resultSet.getDouble(resultSet.getColumnIndex('id')),
686        'name': resultSet.getString(resultSet.getColumnIndex('name')),
687        'gender': resultSet.getDouble(resultSet.getColumnIndex('gender')),
688        'phone': resultSet.getString(resultSet.getColumnIndex('phone')),
689        'age': resultSet.getLong(resultSet.getColumnIndex('age')),
690        'remark': resultSet.getString(resultSet.getColumnIndex('remark'))
691      };
692      if (!contacts.includes(contact)) {
693        // 如果数据集合中没有这条数据就添加进去
694        contacts.push(contact);
695      }
696      // 进入下一行
697      resultSet.goToNextRow();
698    }
699    // 数据整合完毕就释放资源
700    resultSet.close();
701    // 返回整合的联系人数据
702    return contacts;
703  }
704  ```
705
706**3.方法调用**
707
708- 插入数据库方法调用
709
710  创建Button按钮,点击该按钮时,依次执行3000次数据库插入操作。每次循环中,内部计数器`this.count`递增,其值经JSON.stringify处理后作为`phone`字段值,附着在对象`contact`上,通过`taskPoolExecuteInsert`函数将更新后的`contact`插入到数据库中,同时传入`context`上下文。
711
712  ```ts
713  Button('insert', { type: ButtonType.Normal, stateEffect: true })
714    .borderRadius(8)
715    .backgroundColor(0x317aff)
716    .width(120)
717    .height(40)
718    .onClick(() => {
719      for (let index = 0; index < 3000; index++) {
720        this.count++;
721        contact.phone = JSON.stringify(this.count);
722        // 插入数据库
723        taskPoolExecuteInsert(context, contact));
724      }
725    })
726  ```
727
728- 批量插入数据库方法调用
729
730  创建Button按钮,会执行以下操作:`this.sourceData`是一个包含3000个`Contact`对象的数组,通过调用`taskPoolExecuteBatchInsert`异步函数,将这3000条联系人数据批量插入至数据库中,同时传入`context`上下文。
731
732  ```ts
733  Button('batchInsert', { type: ButtonType.Normal, stateEffect: true })
734    .borderRadius(8)
735    .backgroundColor(0x317aff)
736    .width(120)
737    .height(40)
738    .onClick(() => {
739       // 批量插入数据库
740       taskPoolExecuteBatchInsert(context, this.sourceData));
741    })
742  ```
743
744- 查询数据库方法调用
745
746  创建Button按钮,点击该按钮时,触发异步数据库查询操作,通过`taskPoolExecuteQuery`函数并传入`context`。查询结果(类型为`Array<Contact>`)返回后,利用`.then()`将所得数据与组件内`dataArray`合并更新。
747
748  ```ts
749  Button('query', { type: ButtonType.Normal, stateEffect: true })
750    .borderRadius(8)
751    .backgroundColor(0x317aff)
752    .width(120)
753    .height(40)
754    .onClick(async  () => {
755       // 查询数据库
756       taskPoolExecuteQuery(context).then((contact: Array<Contact>) => {
757         this.dataArray = this.dataArray.concat(contact);
758       });
759     })
760  ```
761
762**分析比对**
763
764**数据库查询操作**
765
766- 主线程中执行查询任务
767
768  如下图所示,在主线程的调度中,包含Task1、查询数据库(queryData)以及Task2这三个相继执行的任务。查询数据库操作耗时逾9秒,延缓了后续Task2的启动,从而对主线程的及时响应与整体流畅性产生影响。
769
770  ![主线程查询任务](figures/multi_thread_capability_mainthread.PNG)
771
772- 子线程中执行查询任务
773
774  如下图所示,在主线程的任务调度中,原先是Task1、查询数据库操作(queryData),以及紧随其后的Task2。为优化性能,现已将查询数据库的操作移至一个单独的子线程Subthread_queryData中执行。此调整后,数据库查询方法的调用仅耗时2毫秒,且Task2与子线程Subthread_queryData得以并行执行,彼此互不干扰。此举有效避免了查询任务对主线程造成的任何阻塞,确保了主线程操作的流畅无阻。
775
776  ![子线程查询任务](figures/multi_thread_capability_subthread.PNG)
777
778**数据库批量插入操作**
779
780根据下图展示的批量插入数据库操作追踪详情,处理3000条记录耗时大约2.2秒,大数据批量写入任务的时间消耗。为确保应用程序的流畅操作与即时响应能力,优化策略建议将批量插入操作部署至子线程执行。减轻主线程负担,避免阻塞情况发生,增强用户体验,提高应用运行效率。
781
782![批量插入数据任务](figures/multi_thread_capability_batch_Insert.PNG)
783
784**结论**
785
786运用TaskPool线程池技术创建子线程执行数据库查询任务,可有效避免主线程阻塞,确保其专注于关键操作如界面渲染和用户交互,提升应用流畅度与用户体验。查询结果通过`.then()`异步返回,实现非阻塞处理与列表数据刷新,既充分利用系统资源、加快响应速度,又保持代码结构清晰、易于维护,是一种兼顾效率与可读性的数据库查询优化策略。
787
788同理,数据库的其他操作,包括单条数据插入、批量插入、数据修改及删除等,建议在子线程中执行,以维持应用的流畅互动性。在处理大数据量插入或批量插入任务时,多线程存在线程间通信耗时问题,用[@Sendable装饰器](../arkts-utils/arkts-sendable.md#sendable装饰器)获取性能提升。该装饰器标记的子线程返回类对象,促使系统采取共享内存策略来处理这些对象,大大减少了反序列化的成本,进一步提升了效率。具体可参考[《避免在主线程中执行耗时操作》](./avoid_time_consuming_operations_in_mainthread.md)。
789
790## 相关实例
791
792针对多线程并发,有以下相关实例可供参考:
793
794* [聊天实例应用(ArkTS)(API10)](https://gitcode.com/openharmony/applications_app_samples/tree/master/code/Solutions/IM/Chat)
795