1/* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15import worker, { MessageEvents } from '@ohos.worker'; 16import ArrayList from '@ohos.util.ArrayList'; 17import { Log } from '../../utils/Log'; 18import { Constants } from './Constants'; 19 20 21const TAG = 'WorkerThreadPool' 22 23// 跨模块调用worker路径前缀 24const crossModulePathPrefix: string = '@bundle:com.ohos.photos/'; 25const PHONE_ENTRY_NAME: string = 'phone_photos'; 26const PC_ENTRY_NAME: string = 'pc_photos'; 27 28/** 29 * 自定义worker线程池,负责worker线程的创建、启动、销毁 30 */ 31export class WorkerThreadPool { 32 public static readonly CAPACITY: number = 3; 33 private static workerThreadPoolInstance: WorkerThreadPool; 34 private workerList: ArrayList<worker.ThreadWorker> = new ArrayList(); 35 36 private constructor() { 37 } 38 39 public static getInstance(): WorkerThreadPool { 40 if (!this.workerThreadPoolInstance) { 41 Log.info(TAG, 'single instance create') 42 this.workerThreadPoolInstance = new WorkerThreadPool(); 43 } 44 return this.workerThreadPoolInstance; 45 } 46 47 /** 48 * 创建worker 49 * @param relativePath worker.ts文件所在的相对路径 50 * @param deviceType 设备类型,对应不同entryName (phone, pc) 51 */ 52 public static createWorker(relativePath: string, deviceType: string): worker.ThreadWorker { 53 Log.info(TAG, 'create worker, relativePath: ' + relativePath); 54 let entryName = deviceType == Constants.DEFAULT_DEVICE_TYPE ? PHONE_ENTRY_NAME : PC_ENTRY_NAME; 55 let workerInstance: worker.ThreadWorker; 56 try { 57 workerInstance = new worker.ThreadWorker(crossModulePathPrefix + entryName + relativePath); 58 } catch (err) { 59 Log.error(TAG, 'create worker instance failed' + err); 60 } 61 return workerInstance; 62 } 63 64 /** 65 * 启动一个worker 66 * @param worker worker对象 67 * @param buffer 传入worker的buffer 68 * @param excFunc 主线程回调函数 69 * @param name worker名称 70 */ 71 public static startWorker(worker: worker.ThreadWorker, buffer: ArrayBuffer, callback: Function, name: string): void { 72 if (!worker) { 73 Log.error(TAG, 'worker' + name + 'is null'); 74 } 75 worker.postMessage(buffer); 76 worker.onmessage = function (e: MessageEvents) { 77 callback(e, name); 78 Log.info(TAG, 'worker onmessage end, terminate') 79 worker.terminate(); 80 } 81 worker.onexit = function () { 82 Log.debug(TAG, 'worker' + name + 'exit'); 83 } 84 } 85 86 /** 87 * 终止worker运行 88 * @param worker 89 */ 90 public static terminateWorker(worker: worker.ThreadWorker) { 91 Log.info(TAG, 'terminate worker') 92 if (!worker) { 93 Log.error(TAG, 'worker is null'); 94 } 95 try { 96 worker.terminate(); 97 } catch (err) { 98 Log.error(TAG, 'worker terminate error: ' + JSON.stringify(err)); 99 } 100 } 101 102 /** 103 * 最大容量 104 */ 105 public capacity(): number { 106 return WorkerThreadPool.CAPACITY; 107 } 108 109 /** 110 * 执行workerList中的worker 111 * @param path worker.ts文件路径 112 * @param data 处理的数据 113 * @param byteLengthOfOneGroup 需要以组为单位处理的数据,例如直方图的RGBA像素统计,一组4个字节,该值为4 114 * @param excFunc 主线程回调函数,用于worker结果的数据汇总 115 */ 116 public run(path: string, data: ArrayBuffer, byteLengthOfOneGroup: number, mainThreadCallback: Function) { 117 Log.info(TAG, 'thread pool start running, capacity: ' + WorkerThreadPool.CAPACITY); 118 let groupCount = data.byteLength / byteLengthOfOneGroup; 119 let byteLengthOfOneWorker = Math.floor(groupCount / WorkerThreadPool.CAPACITY) * byteLengthOfOneGroup; 120 let deviceType: string = AppStorage.get('deviceType'); 121 const FIRST_INDEX = 0; 122 for (let i = WorkerThreadPool.CAPACITY - 1; i >= 0; i--) { 123 let workerInstance = WorkerThreadPool.createWorker(path, deviceType); 124 this.workerList.add(workerInstance); 125 let dataSlice = (i == FIRST_INDEX) ? data.slice(i * byteLengthOfOneWorker) : data.slice(i * 126 byteLengthOfOneWorker, (i + 1) * 127 byteLengthOfOneWorker); 128 WorkerThreadPool.startWorker(workerInstance, dataSlice, mainThreadCallback, 'worker instance ' + i); 129 } 130 } 131 132 /** 133 * 终止WorkerThreadPool的所有worker 134 */ 135 public stop(): void { 136 this.workerList.forEach((workerInstance, index) => { 137 Log.info(TAG, 'worker ' + index + ' terminate.') 138 WorkerThreadPool.terminateWorker(workerInstance); 139 }); 140 this.workerList.clear(); 141 } 142}