1/* 2 * Copyright (c) 2023 Hunan OpenValley Digital Industry Development Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15import Logger from '../utils/Logger'; 16import IpUtils from '../utils/IpUtils'; 17import TimerUtil from '../utils/TimerUtil'; 18import { Observer } from '../utils/TimerUtil'; 19import SocketImpl from '../net/SocketImpl'; 20import wifiManager from '@ohos.wifiManager'; 21import worker, { MessageEvents } from '@ohos.worker'; 22import Constants from '../Constants'; 23 24const TAG = 'BufferModel'; 25const IP: string = 'xxx.xxx.xx.xx'; // 设置自己的服务器IP地址,同步修改服务端py代码 26const PORT = 8979; 27const RETRY_INTERVAL = 3; // 重试间隔s 28 29/** 30 * 负责录制和渲染的worker管理 31 * 负责socket服务器的连接与数据传输 32 */ 33class BufferModel { 34 private mSocketImpl: SocketImpl | null = null; 35 private mCapturerWorker: worker.ThreadWorker | null = null; 36 private mRendererWorker: worker.ThreadWorker | null = null; 37 private lastConnected = false; 38 private onDisconnectedCallback: (() => void) | null = null; 39 private observer: Observer | null = null; 40 41 public init(callback: () => void): void { 42 this.connect(); 43 try { 44 this.mCapturerWorker = new worker.ThreadWorker('entry/ets/workers/CapturerWorker.ts'); 45 this.mRendererWorker = new worker.ThreadWorker('entry/ets/workers/RendererWorker.ts'); 46 // 处理来自worker线程的消息 47 this.mCapturerWorker.onmessage = (message: MessageEvents) => { 48 this.onMessage(message); 49 }; 50 this.onDisconnectedCallback = callback; 51 } catch (err) { 52 Logger.error(TAG, `init failed ${JSON.stringify(err)}`); 53 } 54 } 55 56 private onMessage(message: MessageEvents): void { 57 let messageId: number = message.data.code; 58 if (messageId === Constants.WORK_MESSAGE_CAPTURER_SEND) { 59 let buffer: ArrayBuffer = message.data.buffer; 60 this.send(buffer); 61 } 62 } 63 64 /** 65 * 确认服务端和AudioCapturer以及AudioRenderer都正常后,就可以正常通话了 66 */ 67 public startWorkTask(): void { 68 try { 69 if (this.mCapturerWorker && this.mRendererWorker) { 70 this.mCapturerWorker.postMessage({ 'code': Constants.WORK_MESSAGE_CAPTURER_START }); 71 this.mRendererWorker.postMessage({ 'code': Constants.WORK_MESSAGE_RENDERER_START }); 72 } 73 } catch (err) { 74 Logger.error(TAG, `startWorkTask failed ${JSON.stringify(err)}`); 75 } 76 } 77 78 public stopWorkTask(): void { 79 try { 80 if (this.mCapturerWorker) { 81 this.mCapturerWorker.postMessage({ 'code': Constants.WORK_MESSAGE_CAPTURER_STOP }); 82 } 83 if (this.mRendererWorker) { 84 this.mRendererWorker.postMessage({ 'code': Constants.WORK_MESSAGE_RENDERER_STOP }); 85 } 86 } catch (err) { 87 Logger.error(TAG, `stopWorkTask failed ${JSON.stringify(err)}`); 88 } 89 } 90 91 public destroy(): void { 92 if (this.mSocketImpl) { 93 this.mSocketImpl.closeSocket(); 94 this.mSocketImpl = null; 95 } 96 //停止线程 97 try { 98 if (this.mCapturerWorker) { 99 this.mCapturerWorker.terminate(); 100 } 101 } catch (err) { 102 103 } 104 try { 105 if (this.mRendererWorker) { 106 this.mRendererWorker.terminate(); 107 } 108 } catch (err) { 109 110 } 111 if (this.observer) { 112 TimerUtil.getInstance().removeObserver(this.observer); 113 } 114 } 115 116 // TimerUtil回调,大概3秒一次检查socket连接 117 private timerCallback(count: number): void { 118 if (count % RETRY_INTERVAL === 0) { 119 this.socketConnTask(); 120 } 121 } 122 123 // 使用 TimerUtil 全程序共用一个setInterval 124 private connect(): void { 125 Logger.log(TAG, ` >>>>>>>>>>>>>>>>>>>>>> start connect `); 126 this.socketConnTask(); 127 this.observer = TimerUtil.getInstance().registerObserver((count: number) => { 128 this.timerCallback(count); 129 }); 130 } 131 132 //连接断开时的回调,返回控制层处理断连逻辑 133 private onDisconnected(): void { 134 Logger.log(TAG, ` <<<<<<<<<<<<<<<<<<<<<< onDisconnected `); 135 if (this.onDisconnectedCallback) { 136 this.onDisconnectedCallback(); 137 } 138 } 139 140 /** 141 * socket连接任务 142 * 143 * 1、未连接则尝试连接 144 * 2、已连接,socket状态正常,wifi状态正常,则任务执行完毕 145 * 3、已连接,socket状态不正常或wifi状态不正常,则回调连接已断开 146 * 这些逻辑是为了解决 网络断开后 socket onError或onClose 方法无回调或getState状态错误的问题 147 */ 148 private async socketConnTask(): Promise<void> { 149 if (this.mSocketImpl == null) { 150 this.mSocketImpl = new SocketImpl(); 151 } 152 let wifiState: boolean = await this.checkWifiState(); 153 let socketConnected: boolean = await this.isConnected(); 154 Logger.log(TAG, `socketConnTask wifiState = ${wifiState} socketConnected = ${socketConnected} lastConnected = ${this.lastConnected}`); 155 if (!wifiState) { 156 if (this.lastConnected) { //已连接,wifi断开了 157 this.onDisconnected(); 158 } 159 //wifi 断开了就都不处理了 160 return; 161 } 162 if (this.lastConnected && !socketConnected) { ///已连接,socket断开了 163 this.onDisconnected(); 164 return; 165 } 166 if (!socketConnected) { // 尝试首次 167 // 首次连接 168 let ret: boolean = await this.connectToAp(); 169 Logger.log(TAG, `socketConnTask connectToAp ${ret}`); 170 if (ret) { 171 this.setSocketListener(); 172 } 173 return; 174 } 175 //已连接 176 this.lastConnected = socketConnected; 177 } 178 179 async checkWifiState(): Promise<boolean> { 180 let linkInfo = await wifiManager.getLinkedInfo(); 181 Logger.log(TAG, `checkWifiState ${linkInfo.ssid} : state=${linkInfo.connState}`); 182 if (linkInfo.connState !== wifiManager.ConnState.CONNECTED) { 183 return false; 184 } 185 return true; 186 } 187 188 private async setSocketListener(): Promise<void> { 189 if (this.mSocketImpl == null) { 190 return; 191 } 192 this.mSocketImpl.setOnMessageReceivedListener((result: ArrayBuffer) => { 193 this.receiveData(result); 194 }); 195 this.mSocketImpl.setOnErrorListener(() => { 196 // 连接后 出错了 197 if (this.lastConnected) { 198 this.onDisconnected(); 199 } 200 }); 201 this.mSocketImpl.setOnCloseListener(() => { 202 // 连接后 close了 203 if (this.lastConnected) { 204 this.onDisconnected(); 205 } 206 }); 207 } 208 209 private async isConnected(): Promise<boolean> { 210 if (this.mSocketImpl == null) { 211 return false; 212 } 213 return await this.mSocketImpl.isConnected(); 214 } 215 216 // 发起连接 217 private async connectToAp(): Promise<boolean> { 218 try { 219 if (this.mSocketImpl == null) { 220 return false; 221 } 222 let ipInfo: wifiManager.IpInfo = wifiManager.getIpInfo(); 223 let ipAddress: number = ipInfo.ipAddress; 224 let gateway: number = ipInfo.gateway; 225 226 if (ipAddress === 0 || gateway === 0) { 227 return false; 228 } 229 let localIp: string = IpUtils.getIpAddress(ipAddress); 230 let serverIp: string = IpUtils.getIpAddress(gateway); 231 let ret: boolean = await this.mSocketImpl.createSocket(localIp, PORT); 232 if (!ret) { 233 return false; 234 } 235 // 根据自己情况选择IP 236 // 服务端在pc启动,并且设备连接了pc的热点 237 ret = await this.mSocketImpl.connectSocket(serverIp, PORT); 238 if (!ret) { 239 return false; 240 } 241 return true; 242 } catch (e) { 243 244 } 245 return false; 246 } 247 248 // 收到音频流后直接送到 renderer 进行播放 249 private async receiveData(buffer: ArrayBuffer): Promise<void> { 250 Logger.log(TAG, `TCP onReceive: `); 251 if (this.mRendererWorker) { 252 this.mRendererWorker.postMessage({ 253 'code': Constants.WORK_MESSAGE_RENDERER_RECV, 254 'buffer': buffer 255 }, [buffer]); 256 } 257 } 258 259 // 采集到的音频流直接发送到socket服务端 260 private async send(buffer: ArrayBuffer): Promise<void> { 261 Logger.info(TAG, `send buffer`); 262 if (this.mSocketImpl) { 263 await this.mSocketImpl.sendData(buffer); 264 } 265 } 266} 267 268 269let mModel = new BufferModel(); 270 271export default mModel as BufferModel;