1/* 2 * Copyright (C) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16import { DataMessage } from '../message/DataMessage'; 17 18export class DataMessageQueue<T> { 19 private eleArray: Array<T>; 20 21 constructor() { 22 this.eleArray = new Array<T>(); 23 } 24 25 public push(entry: T): boolean { 26 if (entry == null) { 27 return false; 28 } 29 this.eleArray.unshift(entry); 30 return true; 31 } 32 33 public pop(): T | undefined { 34 return this.eleArray.pop(); 35 } 36 37 public size(): number { 38 return this.eleArray.length; 39 } 40} 41 42interface Resolver { 43 (data: DataMessage): void; 44} 45 46export class AsyncQueue<T> { 47 private promiseQueue: DataMessageQueue<Promise<DataMessage>> = new DataMessageQueue<Promise<DataMessage>>(); 48 private resolverQueue: DataMessageQueue<Resolver> = new DataMessageQueue<Resolver>(); 49 50 add() { 51 this.promiseQueue.push( 52 new Promise((resolve) => { 53 this.resolverQueue.push(resolve); 54 }) 55 ); 56 } 57 58 enqueue(item: DataMessage) { 59 if (this.resolverQueue.size() == 0) { 60 this.add(); 61 } 62 const resolve = this.resolverQueue.pop(); 63 resolve ? resolve(item) : null; 64 } 65 66 async dequeue(): Promise<DataMessage> { 67 if (this.promiseQueue.size() == 0) { 68 this.add(); 69 } 70 return this.promiseQueue.pop() || new Promise<DataMessage>(() => {}); 71 } 72} 73