1/* 2 * Copyright (c) 2023 Hunan OpenValley Digital Industry Development Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15import Logger from '../utils/Logger'; 16import IpUtils from '../utils/IpUtils'; 17import TimerUtil from '../utils/TimerUtil'; 18import type { Observer } from '../utils/TimerUtil'; 19import SocketImpl from '../net/SocketImpl'; 20import wifiManager from '@ohos.wifiManager'; 21import worker from '@ohos.worker'; 22import Constants from '../Constants'; 23 24const TAG = 'BufferModel'; 25const IP: string = 'xxx.xxx.xx.xx'; // 设置自己的服务器IP地址,同步修改服务端py代码 26const PORT = 8979; 27const RETRY_INTERVAL = 3; // 重试间隔s 28 29/** 30 * 负责录制和渲染的worker管理 31 * 负责socket服务器的连接与数据传输 32 */ 33class BufferModel { 34 private mSocketImpl: SocketImpl = new SocketImpl(); 35 private mCapturerWorker: worker.ThreadWorker; 36 private mRendererWorker: worker.ThreadWorker; 37 private lastConnected = false; 38 private onDisconnectedCallback: () => void; 39 40 private observer: Observer; 41 42 public init(callback): void { 43 this.connect(); 44 try { 45 this.mCapturerWorker = new worker.ThreadWorker('entry/ets/workers/CapturerWorker.ts'); 46 this.mRendererWorker = new worker.ThreadWorker('entry/ets/workers/RendererWorker.ts'); 47 // 处理来自worker线程的消息 48 this.mCapturerWorker.onmessage = this.onMessage.bind(this); 49 this.onDisconnectedCallback = callback; 50 } catch (err) { 51 Logger.error(`${TAG}: init failed, code is ${err.code}, message is ${err.message}`); 52 } 53 } 54 55 private onMessage(message): void { 56 let messageId: number = message.data.code; 57 if (messageId === Constants.WORK_MESSAGE_CAPTURER_SEND) { 58 let buffer: ArrayBuffer = message.data.buffer; 59 this.send(buffer); 60 } 61 } 62 63 /** 64 * 确认服务端和AudioCapturer以及AudioRenderer都正常后,就可以正常通话了 65 */ 66 public startWorkTask(): void { 67 try { 68 this.mCapturerWorker.postMessage({ 'code': Constants.WORK_MESSAGE_CAPTURER_START }); 69 this.mRendererWorker.postMessage({ 'code': Constants.WORK_MESSAGE_RENDERER_START }); 70 } catch (err) { 71 Logger.error(`${TAG}: startWorkTask failed, code is ${err.code}, message is ${err.message}`); 72 } 73 } 74 75 public stopWorkTask(): void { 76 try { 77 this.mCapturerWorker.postMessage({ 'code': Constants.WORK_MESSAGE_CAPTURER_STOP }); 78 this.mRendererWorker.postMessage({ 'code': Constants.WORK_MESSAGE_RENDERER_STOP }); 79 } catch (err) { 80 Logger.error(`${TAG}: stopWorkTask failed, code is ${err.code}, message is ${err.message}`); 81 } 82 } 83 84 public destroy(): void { 85 if (this.mSocketImpl) { 86 this.mSocketImpl.closeSocket(); 87 this.mSocketImpl = null; 88 } 89 //停止线程 90 try { 91 this.mCapturerWorker.terminate(); 92 } catch (err) { 93 94 } 95 try { 96 this.mRendererWorker.terminate(); 97 } catch (err) { 98 99 } 100 TimerUtil.removeObserver(this.observer); 101 } 102 103 // TimerUtil回调,大概3秒一次检查socket连接 104 private timerCallback(count: number): void { 105 if (count % RETRY_INTERVAL === 0) { 106 this.socketConnTask(); 107 } 108 } 109 110 // 使用 TimerUtil 全程序共用一个setInterval 111 private connect(): void { 112 Logger.log(`${TAG} >>>>>>>>>>>>>>>>>>>>>> start connect `); 113 this.socketConnTask(); 114 this.observer = TimerUtil.registerObserver((count: number) => { 115 this.timerCallback(count); 116 }); 117 } 118 119 //连接断开时的回调,返回控制层处理断连逻辑 120 private onDisconnected(): void { 121 Logger.log(`${TAG} <<<<<<<<<<<<<<<<<<<<<< onDisconnected `); 122 if (this.onDisconnectedCallback) { 123 this.onDisconnectedCallback(); 124 } 125 } 126 127 /** 128 * socket连接任务 129 * 130 * 1、未连接则尝试连接 131 * 2、已连接,socket状态正常,wifi状态正常,则任务执行完毕 132 * 3、已连接,socket状态不正常或wifi状态不正常,则回调连接已断开 133 * 这些逻辑是为了解决 网络断开后 socket onError或onClose 方法无回调或getState状态错误的问题 134 */ 135 private async socketConnTask(): Promise<void> { 136 let wifiState: boolean = await this.checkWifiState(); 137 let socketConnected: boolean = await this.isConnected(); 138 Logger.log(`${TAG} socketConnTask wifiState = ${wifiState} socketConnected = ${socketConnected} lastConnected = ${this.lastConnected}`); 139 if (!wifiState) { 140 if (this.lastConnected) { //已连接,wifi断开了 141 this.onDisconnected(); 142 } 143 //wifi 断开了就都不处理了 144 return; 145 } 146 if (this.lastConnected && !socketConnected) { ///已连接,socket断开了 147 this.onDisconnected(); 148 return; 149 } 150 if (!socketConnected) { // 尝试首次 151 // 首次连接 152 let ret: boolean = await this.connectToAp(); 153 Logger.log(`${TAG} socketConnTask connectToAp ${ret}`); 154 if (ret) { 155 this.setSocketListener(); 156 } 157 return; 158 } 159 //已连接 160 this.lastConnected = socketConnected; 161 } 162 163 164 async checkWifiState(): Promise<boolean> { 165 let linkInfo = await wifiManager.getLinkedInfo(); 166 console.log(`${TAG} checkWifiState ${linkInfo.ssid} : state=${linkInfo.connState}`); 167 if (linkInfo.connState !== wifiManager.ConnState.CONNECTED) { 168 return false; 169 } 170 return true; 171 } 172 173 174 private async setSocketListener(): Promise<void> { 175 this.mSocketImpl.setOnMessageReceivedListener((result: ArrayBuffer) => { 176 this.receiveData(result); 177 }); 178 this.mSocketImpl.setOnErrorListener(() => { 179 // 连接后 出错了 180 if (this.lastConnected) { 181 this.onDisconnected(); 182 } 183 }); 184 this.mSocketImpl.setOnCloseListener(() => { 185 // 连接后 close了 186 if (this.lastConnected) { 187 this.onDisconnected(); 188 } 189 }); 190 } 191 192 private async isConnected(): Promise<boolean> { 193 return await this.mSocketImpl.isConnected(); 194 } 195 196 // 发起连接 197 private async connectToAp(): Promise<boolean> { 198 try { 199 let ipInfo: wifiManager.IpInfo = wifiManager.getIpInfo(); 200 let ipAddress: number = ipInfo.ipAddress; 201 let gateway: number = ipInfo.gateway; 202 203 if (ipAddress === 0 || gateway === 0) { 204 return false; 205 } 206 let localIp: string = IpUtils.getIpAddress(ipAddress); 207 let serverIp: string = IpUtils.getIpAddress(gateway); 208 let ret: boolean = await this.mSocketImpl.createSocket(localIp, PORT); 209 if (!ret) { 210 return false; 211 } 212 // 根据自己情况选择IP 213 // 服务端在pc启动,并且设备连接了pc的热点 214 ret = await this.mSocketImpl.connectSocket(serverIp, PORT); 215 if (!ret) { 216 return false; 217 } 218 return true; 219 } catch (e) { 220 221 } 222 return false; 223 } 224 225 226 // 收到音频流后直接送到 renderer 进行播放 227 private async receiveData(buffer: ArrayBuffer): Promise<void> { 228 Logger.log(`${TAG} TCP onReceive: `); 229 this.mRendererWorker.postMessage({ 230 'code': Constants.WORK_MESSAGE_RENDERER_RECV, 231 'buffer': buffer 232 }, [buffer]); 233 } 234 235 // 采集到的音频流直接发送到socket服务端 236 private async send(buffer: ArrayBuffer): Promise<void> { 237 Logger.info(`${TAG} send buffer`); 238 await this.mSocketImpl.sendData(buffer); 239 } 240} 241 242 243let mModel = new BufferModel(); 244 245export default mModel as BufferModel;