• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15import worker, { MessageEvents } from '@ohos.worker';
16import ArrayList from '@ohos.util.ArrayList';
17import { Log } from '../../utils/Log';
18import { Constants } from './Constants';
19
20
21const TAG = 'WorkerThreadPool'
22
23// 跨模块调用worker路径前缀
24const crossModulePathPrefix: string = '@bundle:com.ohos.photos/';
25const PHONE_ENTRY_NAME: string = 'phone_photos';
26const PC_ENTRY_NAME: string = 'pc_photos';
27
28/**
29 * 自定义worker线程池,负责worker线程的创建、启动、销毁
30 */
31export class WorkerThreadPool {
32  public static readonly CAPACITY: number = 3;
33  private static workerThreadPoolInstance: WorkerThreadPool;
34  private workerList: ArrayList<worker.ThreadWorker> = new ArrayList();
35
36  private constructor() {
37  }
38
39  public static getInstance(): WorkerThreadPool {
40    if (!this.workerThreadPoolInstance) {
41      Log.info(TAG, 'single instance create')
42      this.workerThreadPoolInstance = new WorkerThreadPool();
43    }
44    return this.workerThreadPoolInstance;
45  }
46
47  /**
48   * 创建worker
49   * @param relativePath  worker.ts文件所在的相对路径
50   * @param deviceType    设备类型,对应不同entryName (phone, pc)
51   */
52  public static createWorker(relativePath: string, deviceType: string): worker.ThreadWorker {
53    Log.info(TAG, 'create worker, relativePath: ' + relativePath);
54    let entryName = deviceType == Constants.DEFAULT_DEVICE_TYPE ? PHONE_ENTRY_NAME : PC_ENTRY_NAME;
55    let workerInstance: worker.ThreadWorker;
56    try {
57      workerInstance = new worker.ThreadWorker(crossModulePathPrefix + entryName + relativePath);
58    } catch (err) {
59      Log.error(TAG, 'create worker instance failed' + err);
60    }
61    return workerInstance;
62  }
63
64  /**
65   * 启动一个worker
66   * @param worker  worker对象
67   * @param buffer  传入worker的buffer
68   * @param excFunc 主线程回调函数
69   * @param name    worker名称
70   */
71  public static startWorker(worker: worker.ThreadWorker, buffer: ArrayBuffer, callback: Function, name: string): void {
72    if (!worker) {
73      Log.error(TAG, 'worker' + name + 'is null');
74    }
75    worker.postMessage(buffer);
76    worker.onmessage = function (e: MessageEvents) {
77      callback(e, name);
78      Log.info(TAG, 'worker onmessage end, terminate')
79      worker.terminate();
80    }
81    worker.onexit = function () {
82      Log.debug(TAG, 'worker' + name + 'exit');
83    }
84  }
85
86  /**
87   * 终止worker运行
88   * @param worker
89   */
90  public static terminateWorker(worker: worker.ThreadWorker) {
91    Log.info(TAG, 'terminate worker')
92    if (!worker) {
93      Log.error(TAG, 'worker is null');
94    }
95    try {
96      worker.terminate();
97    } catch (err) {
98      Log.error(TAG, 'worker terminate error: ' + JSON.stringify(err));
99    }
100  }
101
102  /**
103   * 最大容量
104   */
105  public capacity(): number {
106    return WorkerThreadPool.CAPACITY;
107  }
108
109  /**
110   * 执行workerList中的worker
111   * @param path                   worker.ts文件路径
112   * @param data                   处理的数据
113   * @param byteLengthOfOneGroup   需要以组为单位处理的数据,例如直方图的RGBA像素统计,一组4个字节,该值为4
114   * @param excFunc                主线程回调函数,用于worker结果的数据汇总
115   */
116  public run(path: string, data: ArrayBuffer, byteLengthOfOneGroup: number, mainThreadCallback: Function) {
117    Log.info(TAG, 'thread pool start running, capacity: ' + WorkerThreadPool.CAPACITY);
118    let groupCount = data.byteLength / byteLengthOfOneGroup;
119    let byteLengthOfOneWorker = Math.floor(groupCount / WorkerThreadPool.CAPACITY) * byteLengthOfOneGroup;
120    let deviceType: string = AppStorage.get('deviceType');
121    const FIRST_INDEX = 0;
122    for (let i = WorkerThreadPool.CAPACITY - 1; i >= 0; i--) {
123      let workerInstance = WorkerThreadPool.createWorker(path, deviceType);
124      this.workerList.add(workerInstance);
125      let dataSlice = (i == FIRST_INDEX) ? data.slice(i * byteLengthOfOneWorker) : data.slice(i *
126      byteLengthOfOneWorker, (i + 1) *
127      byteLengthOfOneWorker);
128      WorkerThreadPool.startWorker(workerInstance, dataSlice, mainThreadCallback, 'worker instance ' + i);
129    }
130  }
131
132  /**
133   * 终止WorkerThreadPool的所有worker
134   */
135  public stop(): void {
136    this.workerList.forEach((workerInstance, index) => {
137      Log.info(TAG, 'worker ' + index + ' terminate.')
138      WorkerThreadPool.terminateWorker(workerInstance);
139    });
140    this.workerList.clear();
141  }
142}