• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/*
2 * Copyright (c) 2023 Hunan OpenValley Digital Industry Development Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *  http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15import Logger from '../utils/Logger';
16import IpUtils from '../utils/IpUtils';
17import TimerUtil from '../utils/TimerUtil';
18import type { Observer } from '../utils/TimerUtil';
19import SocketImpl from '../net/SocketImpl';
20import wifiManager from '@ohos.wifiManager';
21import worker from '@ohos.worker';
22import Constants from '../Constants';
23
24const TAG = 'BufferModel';
25const IP: string = 'xxx.xxx.xx.xx'; // 设置自己的服务器IP地址,同步修改服务端py代码
26const PORT = 8979;
27const RETRY_INTERVAL = 3; // 重试间隔s
28
29/**
30 * 负责录制和渲染的worker管理
31 * 负责socket服务器的连接与数据传输
32 */
33class BufferModel {
34  private mSocketImpl: SocketImpl = new SocketImpl();
35  private mCapturerWorker: worker.ThreadWorker;
36  private mRendererWorker: worker.ThreadWorker;
37  private lastConnected = false;
38  private onDisconnectedCallback: () => void;
39
40  private observer: Observer;
41
42  public init(callback): void {
43    this.connect();
44    try {
45      this.mCapturerWorker = new worker.ThreadWorker('entry/ets/workers/CapturerWorker.ts');
46      this.mRendererWorker = new worker.ThreadWorker('entry/ets/workers/RendererWorker.ts');
47      // 处理来自worker线程的消息
48      this.mCapturerWorker.onmessage = this.onMessage.bind(this);
49      this.onDisconnectedCallback = callback;
50    } catch (err) {
51      Logger.error(`${TAG}: init failed, code is ${err.code}, message is ${err.message}`);
52    }
53  }
54
55  private onMessage(message): void {
56    let messageId: number = message.data.code;
57    if (messageId === Constants.WORK_MESSAGE_CAPTURER_SEND) {
58      let buffer: ArrayBuffer = message.data.buffer;
59      this.send(buffer);
60    }
61  }
62
63  /**
64   * 确认服务端和AudioCapturer以及AudioRenderer都正常后,就可以正常通话了
65   */
66  public startWorkTask(): void {
67    try {
68      this.mCapturerWorker.postMessage({ 'code': Constants.WORK_MESSAGE_CAPTURER_START });
69      this.mRendererWorker.postMessage({ 'code': Constants.WORK_MESSAGE_RENDERER_START });
70    } catch (err) {
71      Logger.error(`${TAG}: startWorkTask failed, code is ${err.code}, message is ${err.message}`);
72    }
73  }
74
75  public stopWorkTask(): void {
76    try {
77      this.mCapturerWorker.postMessage({ 'code': Constants.WORK_MESSAGE_CAPTURER_STOP });
78      this.mRendererWorker.postMessage({ 'code': Constants.WORK_MESSAGE_RENDERER_STOP });
79    } catch (err) {
80      Logger.error(`${TAG}: stopWorkTask failed, code is ${err.code}, message is ${err.message}`);
81    }
82  }
83
84  public destroy(): void {
85    if (this.mSocketImpl) {
86      this.mSocketImpl.closeSocket();
87      this.mSocketImpl = null;
88    }
89    //停止线程
90    try {
91      this.mCapturerWorker.terminate();
92    } catch (err) {
93
94    }
95    try {
96      this.mRendererWorker.terminate();
97    } catch (err) {
98
99    }
100    TimerUtil.removeObserver(this.observer);
101  }
102
103  // TimerUtil回调,大概3秒一次检查socket连接
104  private timerCallback(count: number): void {
105    if (count % RETRY_INTERVAL === 0) {
106      this.socketConnTask();
107    }
108  }
109
110  // 使用 TimerUtil 全程序共用一个setInterval
111  private connect(): void {
112    Logger.log(`${TAG} >>>>>>>>>>>>>>>>>>>>>> start connect `);
113    this.socketConnTask();
114    this.observer = TimerUtil.registerObserver((count: number) => {
115      this.timerCallback(count);
116    });
117  }
118
119  //连接断开时的回调,返回控制层处理断连逻辑
120  private onDisconnected(): void {
121    Logger.log(`${TAG} <<<<<<<<<<<<<<<<<<<<<< onDisconnected `);
122    if (this.onDisconnectedCallback) {
123      this.onDisconnectedCallback();
124    }
125  }
126
127  /**
128   * socket连接任务
129   *
130   * 1、未连接则尝试连接
131   * 2、已连接,socket状态正常,wifi状态正常,则任务执行完毕
132   * 3、已连接,socket状态不正常或wifi状态不正常,则回调连接已断开
133   * 这些逻辑是为了解决 网络断开后 socket onError或onClose 方法无回调或getState状态错误的问题
134   */
135  private async socketConnTask(): Promise<void> {
136    let wifiState: boolean = await this.checkWifiState();
137    let socketConnected: boolean = await this.isConnected();
138    Logger.log(`${TAG}  socketConnTask wifiState = ${wifiState}  socketConnected = ${socketConnected} lastConnected = ${this.lastConnected}`);
139    if (!wifiState) {
140      if (this.lastConnected) { //已连接,wifi断开了
141        this.onDisconnected();
142      }
143      //wifi 断开了就都不处理了
144      return;
145    }
146    if (this.lastConnected && !socketConnected) { ///已连接,socket断开了
147      this.onDisconnected();
148      return;
149    }
150    if (!socketConnected) { // 尝试首次
151      // 首次连接
152      let ret: boolean = await this.connectToAp();
153      Logger.log(`${TAG}  socketConnTask connectToAp ${ret}`);
154      if (ret) {
155        this.setSocketListener();
156      }
157      return;
158    }
159    //已连接
160    this.lastConnected = socketConnected;
161  }
162
163
164  async checkWifiState(): Promise<boolean> {
165    let linkInfo = await wifiManager.getLinkedInfo();
166    console.log(`${TAG} checkWifiState ${linkInfo.ssid} : state=${linkInfo.connState}`);
167    if (linkInfo.connState !== wifiManager.ConnState.CONNECTED) {
168      return false;
169    }
170    return true;
171  }
172
173
174  private async setSocketListener(): Promise<void> {
175    this.mSocketImpl.setOnMessageReceivedListener((result: ArrayBuffer) => {
176      this.receiveData(result);
177    });
178    this.mSocketImpl.setOnErrorListener(() => {
179      // 连接后 出错了
180      if (this.lastConnected) {
181        this.onDisconnected();
182      }
183    });
184    this.mSocketImpl.setOnCloseListener(() => {
185      // 连接后 close了
186      if (this.lastConnected) {
187        this.onDisconnected();
188      }
189    });
190  }
191
192  private async isConnected(): Promise<boolean> {
193    return await this.mSocketImpl.isConnected();
194  }
195
196  // 发起连接
197  private async connectToAp(): Promise<boolean> {
198    try {
199      let ipInfo: wifiManager.IpInfo = wifiManager.getIpInfo();
200      let ipAddress: number = ipInfo.ipAddress;
201      let gateway: number = ipInfo.gateway;
202
203      if (ipAddress === 0 || gateway === 0) {
204        return false;
205      }
206      let localIp: string = IpUtils.getIpAddress(ipAddress);
207      let serverIp: string = IpUtils.getIpAddress(gateway);
208      let ret: boolean = await this.mSocketImpl.createSocket(localIp, PORT);
209      if (!ret) {
210        return false;
211      }
212      // 根据自己情况选择IP
213      // 服务端在pc启动,并且设备连接了pc的热点
214      ret = await this.mSocketImpl.connectSocket(serverIp, PORT);
215      if (!ret) {
216        return false;
217      }
218      return true;
219    } catch (e) {
220
221    }
222    return false;
223  }
224
225
226  // 收到音频流后直接送到 renderer 进行播放
227  private async receiveData(buffer: ArrayBuffer): Promise<void> {
228    Logger.log(`${TAG} TCP onReceive: `);
229    this.mRendererWorker.postMessage({
230      'code': Constants.WORK_MESSAGE_RENDERER_RECV,
231      'buffer': buffer
232    }, [buffer]);
233  }
234
235  // 采集到的音频流直接发送到socket服务端
236  private async send(buffer: ArrayBuffer): Promise<void> {
237    Logger.info(`${TAG} send buffer`);
238    await this.mSocketImpl.sendData(buffer);
239  }
240}
241
242
243let mModel = new BufferModel();
244
245export default mModel as BufferModel;