• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/*
2 * Copyright (c) 2023 Hunan OpenValley Digital Industry Development Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *  http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15import Logger from '../utils/Logger';
16import IpUtils from '../utils/IpUtils';
17import TimerUtil from '../utils/TimerUtil';
18import { Observer } from '../utils/TimerUtil';
19import SocketImpl from '../net/SocketImpl';
20import wifiManager from '@ohos.wifiManager';
21import worker, { MessageEvents } from '@ohos.worker';
22import Constants from '../Constants';
23
24const TAG = 'BufferModel';
25const IP: string = 'xxx.xxx.xx.xx'; // 设置自己的服务器IP地址,同步修改服务端py代码
26const PORT = 8979;
27const RETRY_INTERVAL = 3; // 重试间隔s
28
29/**
30 * 负责录制和渲染的worker管理
31 * 负责socket服务器的连接与数据传输
32 */
33class BufferModel {
34  private mSocketImpl: SocketImpl | null = null;
35  private mCapturerWorker: worker.ThreadWorker | null = null;
36  private mRendererWorker: worker.ThreadWorker | null = null;
37  private lastConnected = false;
38  private onDisconnectedCallback: (() => void) | null = null;
39  private observer: Observer | null = null;
40
41  public init(callback: () => void): void {
42    this.connect();
43    try {
44      this.mCapturerWorker = new worker.ThreadWorker('entry/ets/workers/CapturerWorker.ts');
45      this.mRendererWorker = new worker.ThreadWorker('entry/ets/workers/RendererWorker.ts');
46      // 处理来自worker线程的消息
47      this.mCapturerWorker.onmessage = (message: MessageEvents) => {
48        this.onMessage(message);
49      };
50      this.onDisconnectedCallback = callback;
51    } catch (err) {
52      Logger.error(TAG, `init failed ${JSON.stringify(err)}`);
53    }
54  }
55
56  private onMessage(message: MessageEvents): void {
57    let messageId: number = message.data.code;
58    if (messageId === Constants.WORK_MESSAGE_CAPTURER_SEND) {
59      let buffer: ArrayBuffer = message.data.buffer;
60      this.send(buffer);
61    }
62  }
63
64  /**
65   * 确认服务端和AudioCapturer以及AudioRenderer都正常后,就可以正常通话了
66   */
67  public startWorkTask(): void {
68    try {
69      if (this.mCapturerWorker && this.mRendererWorker) {
70        this.mCapturerWorker.postMessage({ 'code': Constants.WORK_MESSAGE_CAPTURER_START });
71        this.mRendererWorker.postMessage({ 'code': Constants.WORK_MESSAGE_RENDERER_START });
72      }
73    } catch (err) {
74      Logger.error(TAG, `startWorkTask failed ${JSON.stringify(err)}`);
75    }
76  }
77
78  public stopWorkTask(): void {
79    try {
80      if (this.mCapturerWorker) {
81        this.mCapturerWorker.postMessage({ 'code': Constants.WORK_MESSAGE_CAPTURER_STOP });
82      }
83      if (this.mRendererWorker) {
84        this.mRendererWorker.postMessage({ 'code': Constants.WORK_MESSAGE_RENDERER_STOP });
85      }
86    } catch (err) {
87      Logger.error(TAG, `stopWorkTask failed ${JSON.stringify(err)}`);
88    }
89  }
90
91  public destroy(): void {
92    if (this.mSocketImpl) {
93      this.mSocketImpl.closeSocket();
94      this.mSocketImpl = null;
95    }
96    //停止线程
97    try {
98      if (this.mCapturerWorker) {
99        this.mCapturerWorker.terminate();
100      }
101    } catch (err) {
102
103    }
104    try {
105      if (this.mRendererWorker) {
106        this.mRendererWorker.terminate();
107      }
108    } catch (err) {
109
110    }
111    if (this.observer) {
112      TimerUtil.getInstance().removeObserver(this.observer);
113    }
114  }
115
116  // TimerUtil回调,大概3秒一次检查socket连接
117  private timerCallback(count: number): void {
118    if (count % RETRY_INTERVAL === 0) {
119      this.socketConnTask();
120    }
121  }
122
123  // 使用 TimerUtil 全程序共用一个setInterval
124  private connect(): void {
125    Logger.log(TAG, ` >>>>>>>>>>>>>>>>>>>>>> start connect `);
126    this.socketConnTask();
127    this.observer = TimerUtil.getInstance().registerObserver((count: number) => {
128      this.timerCallback(count);
129    });
130  }
131
132  //连接断开时的回调,返回控制层处理断连逻辑
133  private onDisconnected(): void {
134    Logger.log(TAG, ` <<<<<<<<<<<<<<<<<<<<<< onDisconnected `);
135    if (this.onDisconnectedCallback) {
136      this.onDisconnectedCallback();
137    }
138  }
139
140  /**
141   * socket连接任务
142   *
143   * 1、未连接则尝试连接
144   * 2、已连接,socket状态正常,wifi状态正常,则任务执行完毕
145   * 3、已连接,socket状态不正常或wifi状态不正常,则回调连接已断开
146   * 这些逻辑是为了解决 网络断开后 socket onError或onClose 方法无回调或getState状态错误的问题
147   */
148  private async socketConnTask(): Promise<void> {
149    if (this.mSocketImpl == null) {
150      this.mSocketImpl = new SocketImpl();
151    }
152    let wifiState: boolean = await this.checkWifiState();
153    let socketConnected: boolean = await this.isConnected();
154    Logger.log(TAG, `socketConnTask wifiState = ${wifiState}  socketConnected = ${socketConnected} lastConnected = ${this.lastConnected}`);
155    if (!wifiState) {
156      if (this.lastConnected) { //已连接,wifi断开了
157        this.onDisconnected();
158      }
159      //wifi 断开了就都不处理了
160      return;
161    }
162    if (this.lastConnected && !socketConnected) { ///已连接,socket断开了
163      this.onDisconnected();
164      return;
165    }
166    if (!socketConnected) { // 尝试首次
167      // 首次连接
168      let ret: boolean = await this.connectToAp();
169      Logger.log(TAG, `socketConnTask connectToAp ${ret}`);
170      if (ret) {
171        this.setSocketListener();
172      }
173      return;
174    }
175    //已连接
176    this.lastConnected = socketConnected;
177  }
178
179  async checkWifiState(): Promise<boolean> {
180    let linkInfo = await wifiManager.getLinkedInfo();
181    Logger.log(TAG, `checkWifiState ${linkInfo.ssid} : state=${linkInfo.connState}`);
182    if (linkInfo.connState !== wifiManager.ConnState.CONNECTED) {
183      return false;
184    }
185    return true;
186  }
187
188  private async setSocketListener(): Promise<void> {
189    if (this.mSocketImpl == null) {
190      return;
191    }
192    this.mSocketImpl.setOnMessageReceivedListener((result: ArrayBuffer) => {
193      this.receiveData(result);
194    });
195    this.mSocketImpl.setOnErrorListener(() => {
196      // 连接后 出错了
197      if (this.lastConnected) {
198        this.onDisconnected();
199      }
200    });
201    this.mSocketImpl.setOnCloseListener(() => {
202      // 连接后 close了
203      if (this.lastConnected) {
204        this.onDisconnected();
205      }
206    });
207  }
208
209  private async isConnected(): Promise<boolean> {
210    if (this.mSocketImpl == null) {
211      return false;
212    }
213    return await this.mSocketImpl.isConnected();
214  }
215
216  // 发起连接
217  private async connectToAp(): Promise<boolean> {
218    try {
219      if (this.mSocketImpl == null) {
220        return false;
221      }
222      let ipInfo: wifiManager.IpInfo = wifiManager.getIpInfo();
223      let ipAddress: number = ipInfo.ipAddress;
224      let gateway: number = ipInfo.gateway;
225
226      if (ipAddress === 0 || gateway === 0) {
227        return false;
228      }
229      let localIp: string = IpUtils.getIpAddress(ipAddress);
230      let serverIp: string = IpUtils.getIpAddress(gateway);
231      let ret: boolean = await this.mSocketImpl.createSocket(localIp, PORT);
232      if (!ret) {
233        return false;
234      }
235      // 根据自己情况选择IP
236      // 服务端在pc启动,并且设备连接了pc的热点
237      ret = await this.mSocketImpl.connectSocket(serverIp, PORT);
238      if (!ret) {
239        return false;
240      }
241      return true;
242    } catch (e) {
243
244    }
245    return false;
246  }
247
248  // 收到音频流后直接送到 renderer 进行播放
249  private async receiveData(buffer: ArrayBuffer): Promise<void> {
250    Logger.log(TAG, `TCP onReceive: `);
251    if (this.mRendererWorker) {
252      this.mRendererWorker.postMessage({
253        'code': Constants.WORK_MESSAGE_RENDERER_RECV,
254        'buffer': buffer
255      }, [buffer]);
256    }
257  }
258
259  // 采集到的音频流直接发送到socket服务端
260  private async send(buffer: ArrayBuffer): Promise<void> {
261    Logger.info(TAG, `send buffer`);
262    if (this.mSocketImpl) {
263      await this.mSocketImpl.sendData(buffer);
264    }
265  }
266}
267
268
269let mModel = new BufferModel();
270
271export default mModel as BufferModel;