• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3.8
2#
3#   Copyright 2024 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import json
19import logging
20import numpy
21import pandas
22import os
23import re
24import subprocess
25import time
26from acts import asserts
27from acts import base_test
28from acts import context
29from acts import utils
30import acts_contrib.test_utils.bt.bt_test_utils as btutils
31from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
32from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
33
34SHORT_SLEEP = 1
35MED_SLEEP = 5
36
37
38class UwbRangingLink(object):
39    def __init__(self, initiator, responder, uwb_ranging_params):
40        self.initiator_dut = initiator
41        self.responder_dut = responder
42        self.uwb_ranging_params = uwb_ranging_params
43        self.responder_dut.adb.shell('cmd uwb force-country-code enabled US')
44        self.initiator_dut.adb.shell('cmd uwb force-country-code enabled US')
45        self.responder_dut.adb.shell('cmd uwb enable-uwb')
46        self.initiator_dut.adb.shell('cmd uwb enable-uwb')
47        self.ANTENNA_MAPPING = {'ranging': 'none', 'patch': 'azimuth-only'}
48
49    def setup_initiator(self):
50        self.initiator_dut.adb.shell('cmd uwb start-fira-ranging-session '
51                                     '-i 1 -c {} -t controller -r initiator -a 11 -d 22 -e {} -j 3600000'.format(
52            self.uwb_ranging_params['channel'], self.ANTENNA_MAPPING[self.uwb_ranging_params['initiator_antenna']]))
53
54    def clean_up_initiator(self):
55        self.initiator_dut.adb.shell('cmd uwb stop-all-ranging-sessions')
56
57    def measure_range(self, duration):
58        self.responder_dut.adb.shell('cmd uwb start-fira-ranging-session '
59                                     '-i 1 -c {} -t controlee -r responder -a 22 -d 11 -e {} -j 3600000'.format(
60            self.uwb_ranging_params['channel'], self.ANTENNA_MAPPING[self.uwb_ranging_params['responder_antenna']]))
61        ranging_output = ''
62        for idx in range(int(duration/3)):
63            ranging_output = ranging_output + self.responder_dut.adb.shell('cmd uwb get-ranging-session-reports 1')
64            time.sleep(3)
65        self.responder_dut.adb.shell('cmd uwb stop-all-ranging-sessions')
66        logging.debug(ranging_output)
67        ranging_result = self.parse_ranging_result(ranging_output)
68        return ranging_result
69
70    def parse_ranging_result(self, ranging_output):
71        pattern = r"meters: ([\d.]+).*rssiDbm: (-?\d+)"
72        matches = re.findall(pattern, ranging_output)
73
74        measurements = []
75        for match in matches:
76            meters, rssiDbm = match
77            measurements.append({"distance": float(meters), "rssi": int(rssiDbm)})
78        distance_array = [result['distance'] for result in measurements]
79        rssi_array = [result['rssi'] for result in measurements]
80        avg_distance = numpy.mean(distance_array)
81        std_dev_distance = numpy.std(distance_array)
82        avg_rssi = numpy.mean(rssi_array)
83        std_dev_rssi = numpy.std(rssi_array)
84        result = {
85            'raw_results': measurements,
86            'compiled_results': {'distance' : distance_array, 'rssi': rssi_array},
87            'summary': {'avg_distance': avg_distance, 'avg_rssi': avg_rssi,
88                        'std_dev_distance': std_dev_distance, 'std_dev_rssi': std_dev_rssi,
89                        }
90        }
91        return result
92
93class BtRangingLinkV2(object):
94    def __init__(self, initiator, responder, bt_ranging_params):
95        # self.dut1 = self.android_devices[0]
96        # self.dut2 = self.android_devices[1]
97        self.initiator = initiator
98        self.reflector = responder
99        self.bt_ranging_params = bt_ranging_params
100        utils.sync_device_time(self.initiator)
101        utils.sync_device_time(self.reflector)
102        self.setup_devices()
103
104    def setup_devices(self):
105        self.reflector.adb.shell("cmd bluetooth_manager disable")
106        time.sleep(MED_SLEEP)
107        try:
108            self.reflector.adb.shell("rm data/misc/bluetooth/logs/cs_log*")
109        except:
110            logging.info('Could not delete CS logs')
111        self.reflector.adb.shell("cmd bluetooth_manager enable")
112        self.setprop_overwrite_default(self.reflector)
113        self.restart_app(self.reflector)
114        time.sleep(SHORT_SLEEP)
115        self.app_click_reflector(self.reflector)
116
117        self.initiator.adb.shell("cmd bluetooth_manager disable")
118        time.sleep(MED_SLEEP)
119        try:
120            self.initiator.adb.shell("rm data/misc/bluetooth/logs/cs_log*")
121        except:
122            logging.info('Could not delete CS logs')
123        self.initiator.adb.shell("cmd bluetooth_manager enable")
124        self.setprop_overwrite_default(self.initiator)
125    def restart_app(self, device):
126        device.ensure_screen_on()
127        device.unlock_screen()
128        device.adb.shell("am force-stop com.android.bluetooth.channelsoundingtestapp")
129        time.sleep(2)
130        device.adb.shell(
131            "am start -n com.android.bluetooth.channelsoundingtestapp/com.android.bluetooth.channelsoundingtestapp.MainActivity")
132
133    def app_click_reflector(self, device):
134        # step 1
135        filename_re = re.compile(r'([^ ]+.xml)')
136        #st = subprocess.check_output('adb -s {0} shell uiautomator dump'.format(device.serial), shell=True).decode('utf-8')
137        st = device.adb.shell('uiautomator dump')
138        filename_device = filename_re.findall(st)[0]
139        device.adb.pull('{} tmp.xml'.format(filename_device))
140        with open("tmp.xml", 'r') as f:
141            xml = f.read()
142
143        button_reflector = re.compile(
144            r'text="Reflector" resource-id="com.android.bluetooth.channelsoundingtestapp:id/button_reflector" class="android.widget.Button" package="com.android.bluetooth.channelsoundingtestapp" content-desc="" checkable="false" checked="false" clickable="true" enabled="true" focusable="true" focused="false" scrollable="false" long-clickable="false" password="false" selected="false" bounds="\[([0-9]*),([0-9]*)]\[([0-9]*),([0-9]*)\]"'
145        )
146        button_reflector_cord = self.get_cord(button_reflector, xml)
147
148        print("Push [Reflector] on reflector")
149        self.push_button(device,button_reflector_cord)
150        #device.adb.shell("input tap {} {}".format(button_reflector_cord[0], button_reflector_cord[1]))
151        time.sleep(0.5)
152
153        # step 2
154        filename_re = re.compile(r'([^ ]+.xml)')
155        #st = subprocess.check_output('adb -s {0} shell uiautomator dump'.format(device.serial), shell=True).decode('utf-8')
156        st = device.adb.shell('uiautomator dump')
157        filename_device = filename_re.findall(st)[0]
158        device.adb.pull('{} tmp.xml'.format(filename_device))
159        with open("tmp.xml", 'r') as f:
160            xml = f.read()
161
162        button_start_advertising = re.compile(
163            r'text="Start Advertising" resource-id="com.android.bluetooth.channelsoundingtestapp:id/btn_advertising" class="android.widget.Button" package="com.android.bluetooth.channelsoundingtestapp" content-desc="" checkable="false" checked="false" clickable="true" enabled="true" focusable="true" focused="false" scrollable="false" long-clickable="false" password="false" selected="false" bounds="\[([0-9]*),([0-9]*)]\[([0-9]*),([0-9]*)\]"'
164        )
165        button_start_advertising_cord = self.get_cord(button_start_advertising, xml)
166
167        print("Push [Start Advertising] on reflector")
168        self.push_button(device, button_start_advertising_cord)
169        #device.adb.shell("input tap {} {}".format(button_start_advertising_cord[0],
170        #                                                      button_start_advertising_cord[1]))
171        time.sleep(0.5)
172
173
174    def app_click_initiator(self, device, duration=10):
175        # step 1
176        filename_re = re.compile(r'([^ ]+.xml)')
177        st = device.adb.shell('uiautomator dump')
178        #st = subprocess.check_output('adb -s {0} shell uiautomator dump'.format(device.serial), shell=True).decode('utf-8')
179        filename_device = filename_re.findall(st)[0]
180        device.adb.pull('{} tmp.xml'.format(filename_device))
181        with open("tmp.xml", 'r') as f:
182            xml = f.read()
183
184        button_initiator = re.compile(
185            r'text="Initiator" resource-id="com.android.bluetooth.channelsoundingtestapp:id/button_initiator" class="android.widget.Button" package="com.android.bluetooth.channelsoundingtestapp" content-desc="" checkable="false" checked="false" clickable="true" enabled="true" focusable="true" focused="false" scrollable="false" long-clickable="false" password="false" selected="false" bounds="\[([0-9]*),([0-9]*)]\[([0-9]*),([0-9]*)\]"'
186        )
187        button_initiator_cord = self.get_cord(button_initiator, xml)
188
189        print("Push [Initiator] on initiator")
190        self.push_button(device, button_initiator_cord)
191        #device.adb.shell("input tap {} {}".format(button_initiator_cord[0], button_initiator_cord[1]))
192        time.sleep(0.5)
193
194        # step 2
195        filename_re = re.compile(r'([^ ]+.xml)')
196        st = device.adb.shell('uiautomator dump')
197        #st = subprocess.check_output('adb -s {0} shell uiautomator dump'.format(device.serial), shell=True).decode('utf-8')
198        filename_device = filename_re.findall(st)[0]
199        device.adb.pull('{} tmp.xml'.format(filename_device))
200        with open("tmp.xml", 'r') as f:
201            xml = f.read()
202
203        button_connect_gatt = re.compile(
204            r'text="Connect Gatt" resource-id="com.android.bluetooth.channelsoundingtestapp:id/btn_connect_gatt" class="android.widget.Button" package="com.android.bluetooth.channelsoundingtestapp" content-desc="" checkable="false" checked="false" clickable="true" enabled="true" focusable="true" focused="false" scrollable="false" long-clickable="false" password="false" selected="false" bounds="\[([0-9]*),([0-9]*)]\[([0-9]*),([0-9]*)\]"'
205        )
206        button_connect_gatt_cord = self.get_cord(button_connect_gatt, xml)
207
208        button_start_distance_measurement = re.compile(
209            r'text="Start Distance Measurement" resource-id="com.android.bluetooth.channelsoundingtestapp:id/btn_cs" class="android.widget.Button" package="com.android.bluetooth.channelsoundingtestapp" content-desc="" checkable="false" checked="false" clickable="true" enabled="true" focusable="true" focused="false" scrollable="false" long-clickable="false" password="false" selected="false" bounds="\[([0-9]*),([0-9]*)]\[([0-9]*),([0-9]*)\]"'
210        )
211        button_start_distance_measurement_cord = self.get_cord(button_start_distance_measurement, xml)
212
213        print("Push [Connect] on initiator")
214        self.push_button(device, button_connect_gatt_cord)
215        #device.adb.shell(
216        #    "input tap {} {}".format(button_connect_gatt_cord[0], button_connect_gatt_cord[1]))
217        time.sleep(MED_SLEEP)
218
219        print("Push [Start Distance Measurement] on initiator")
220        self.push_button(device, button_start_distance_measurement_cord)
221        #device.adb.shell("input tap {} {}".format(button_start_distance_measurement_cord[0],
222        #                                                      button_start_distance_measurement_cord[1]))
223        time.sleep(0.5)
224
225        # step 3
226        print('wait for {} seconds'.format(duration))
227        time.sleep(duration)
228
229        # if want infinite until button press
230        # _ = input("Press Enter to continue...")
231
232        print("Push [Stop Distance Measurement] on initiator")
233        self.push_button(device, button_start_distance_measurement_cord)
234        #device.adb.shell("input tap {} {}".format(button_start_distance_measurement_cord[0],
235        #                                                      button_start_distance_measurement_cord[1]))
236        time.sleep(0.5)
237
238
239    def get_cord(self, p, fi):
240        x1, y1, x2, y2 = p.findall(fi)[0]
241        cord = ((int(x1) + int(x2)) // 2, (int(y1) + int(y2)) // 2)
242        return cord
243
244
245    def push_button(self, device, button_cord):
246        device.ensure_screen_on()
247        time.sleep(SHORT_SLEEP)
248        device.adb.shell("input tap {0} {1}".format(button_cord[0], button_cord[1]))
249
250
251    def setprop_and_load_cal(self, device):
252        subprocess.run(["./cs_setprop.sh", device])
253        time.sleep(SHORT_SLEEP)
254        subprocess.run(["./load_cal_P24_zero.sh", device])
255        time.sleep(SHORT_SLEEP)
256
257
258    def setprop_overwrite_default(self, device):
259        device.adb.shell('setprop bluetooth.core.cs.channel_map 1FFFFFFFFFFFFC7FFFFC')
260        device.adb.shell('setprop bluetooth.core.cs.max_procedure_count 4')
261        device.adb.shell('setprop bluetooth.core.cs.max_subevent_len 2000000')
262
263
264    def unlock_device(self, device):
265        device.adb.shell("input keyevent 82")
266
267
268    def start_channel_sounding(self, init_device, refl_device):
269        self.app_click_reflector(refl_device)
270        self.app_click_initiator(init_device)
271
272    def parse_bt_cs_results(self, log_path):
273        measurements = []
274        with open(log_path,'r') as log:
275            for line in log:
276                if 'resultMeters' in line:
277                    range = float(line.split(' ')[1].rstrip().rstrip(','))
278                    if range == -1.976171:
279                        measurements.append(float('nan'))
280                    else:
281                        measurements.append(range)
282        avg_distance = numpy.mean(measurements)
283        std_dev_distance = numpy.std(measurements)
284        avg_rssi = 0
285        std_dev_rssi = 0
286        result = {
287            'measurements': measurements,
288            'summary': {'avg_distance': avg_distance, 'avg_rssi': avg_rssi,
289                        'std_dev_distance': std_dev_distance, 'std_dev_rssi': std_dev_rssi,
290                        }
291        }
292        return result
293
294    def collect_log_and_rename(self, device, test_name):
295        files = device.get_file_names("data/misc/bluetooth/logs")
296        log_folder = context.get_current_context().get_full_output_path() + "/cs_logs"
297        os.system("mkdir {0}".format(log_folder))
298        for filename in files:
299            if "cs_log" not in filename:
300                continue
301            log_path = context.get_current_context().get_full_output_path() + "/cs_logs/" + test_name + ".txt"
302            device.pull_files(filename, log_path)
303        return log_path
304    def do_channel_sounding(self, test_name="temp", duration=20):
305        self.initiator.adb.shell("cmd bluetooth_manager enable")
306
307        # App setup
308        self.restart_app(self.initiator)
309        time.sleep(SHORT_SLEEP)
310
311        self.app_click_initiator(self.initiator, duration)
312
313        # self.collect_log_and_rename(self.initiator, test_name=test_name)
314        log_filepath = "./{0}".format(test_name)
315        #adb("pull data/misc/bluetooth/logs {0}".format(log_filepath), device.serial)
316        #self.initiator.pull_files('data/misc/bluetooth/logs', log_filepath)
317
318        log_file_path = self.collect_log_and_rename(self.initiator, test_name=test_name)
319        result = self.parse_bt_cs_results(log_file_path)
320
321        self.initiator.adb.shell("cmd bluetooth_manager disable")
322        self.initiator.adb.shell("rm data/misc/bluetooth/logs/cs_log*")
323        return result
324
325class BtRangingLink(object):
326    def __init__(self, initiator, responder, bt_ranging_params):
327        # self.dut1 = self.android_devices[0]
328        # self.dut2 = self.android_devices[1]
329        self.initiator = initiator
330        self.reflector = responder
331        self.bt_ranging_params = bt_ranging_params
332        self.CSParameters = bt_ranging_params['CSParameters']
333        utils.sync_device_time(self.initiator)
334        utils.sync_device_time(self.reflector)
335        self.setup_devices()
336
337    def setup_devices(self):
338        # CS setprop
339        for dut in [self.initiator, self.reflector]:
340            self.cs_setprop(dut)
341            time.sleep(SHORT_SLEEP)
342            logging.info('Loading BT cal')
343            subprocess.call([self.bt_ranging_params['calibration_file'], dut.serial])
344            #self.load_cal(dut)
345            time.sleep(SHORT_SLEEP)
346            dut.button_cord = self.app_setup(dut)
347        time.sleep(MED_SLEEP)
348
349    def cs_setprop(self, dut):
350        logging.info("{0} setting CS prop ...".format(dut.serial))
351        for key in self.CSParameters:
352            dut.adb.shell("setprop bluetooth.core.cs.{0} {1}".format(key, self.CSParameters[key]))
353
354        ori_mask = dut.adb.getprop("persist.bluetooth.bqr.event_mask")
355        new_mask = (int(ori_mask) | 1)
356        dut.adb.shell("setprop persist.bluetooth.bqr.event_mask {0}".format(new_mask))
357        dut.adb.shell("setprop persist.bluetooth.bqr.min_interval_ms 500")
358
359        dut.adb.shell("touch data/misc/bluetooth/logs/cs_log_tmp.txt")
360        dut.adb.shell("cmd bluetooth_manager disable")
361        dut.adb.shell("cmd bluetooth_manager wait-for-state:STATE_OFF")
362        dut.adb.shell("rm data/misc/bluetooth/logs/cs_log*")
363        dut.adb.shell("cmd bluetooth_manager enable")
364
365        time.sleep(MED_SLEEP)
366
367    def load_cal(self, dut):
368        logging.info("{0} loading calibration file ...".format(dut.serial))
369        with open(self.bt_ranging_params['calibration_file'], 'r') as f:
370            for line in f:
371                if not line[0] == '#':
372                    dut.adb.shell("/vendor/bin/hw/hci_inject -c {0}".format(line))
373
374    def app_setup(self, dut):
375        logging.info("{0} restarting CS App ...".format(dut.serial))
376        dut.ensure_screen_on()
377        dut.unlock_screen()
378        self._restart_app(dut)
379
380        button_connect_gatt = re.compile(
381            r'text="Connect Gatt" resource-id="com.example.ble_test:id/btn_connect_gatt" class="android.widget.Button" package="com.example.ble_test" content-desc="" checkable="false" checked="false" clickable="true" enabled="true" focusable="true" focused="false" scrollable="false" long-clickable="false" password="false" selected="false" bounds="\[([0-9]*),([0-9]*)]\[([0-9]*),([0-9]*)\]"'
382        )
383        button_adv = re.compile(
384            r'text="Start Adv" resource-id="com.example.ble_test:id/btn_adv" class="android.widget.Button" package="com.example.ble_test" content-desc="" checkable="false" checked="false" clickable="true" enabled="true" focusable="true" focused="false" scrollable="false" long-clickable="false" password="false" selected="false" bounds="\[([0-9]*),([0-9]*)]\[([0-9]*),([0-9]*)\]"'
385        )
386        button_clear_log = re.compile(
387            r'text="Clear log" resource-id="com.example.ble_test:id/btn_clear_log" class="android.widget.Button" package="com.example.ble_test" content-desc="" checkable="false" checked="false" clickable="true" enabled="true" focusable="true" focused="false" scrollable="false" long-clickable="false" password="false" selected="false" bounds="\[([0-9]*),([0-9]*)]\[([0-9]*),([0-9]*)\]"'
388        )
389        button_cs = re.compile(
390            r'text="Start CS" resource-id="com.example.ble_test:id/btn_cs" class="android.widget.Button" package="com.example.ble_test" content-desc="" checkable="false" checked="false" clickable="true" enabled="true" focusable="true" focused="false" scrollable="false" long-clickable="false" password="false" selected="false" bounds="\[([0-9]*),([0-9]*)]\[([0-9]*),([0-9]*)\]"'
391        )
392
393        dut_xml = ""
394        filename_re = re.compile(r'([^ ]+.xml)')
395        st = subprocess.check_output('adb -s {0} shell uiautomator dump'.format(dut.serial), shell=True).decode('utf-8')
396        filename_device = filename_re.findall(st)[0]
397        dut.adb.pull("{0} tmp.xml".format(filename_device))
398        with open("tmp.xml", 'r') as f:
399            dut_xml = f.read()
400        connect_gatt_cord = self._get_cord(button_connect_gatt, dut_xml)
401        adv_cord = self._get_cord(button_adv, dut_xml)
402        clear_log_cord = self._get_cord(button_clear_log, dut_xml)
403        cs_cord = self._get_cord(button_cs, dut_xml)
404        button_cord = {"connect_gatt": connect_gatt_cord,
405                       "adv": adv_cord,
406                       "clear_log": clear_log_cord,
407                       "cs": cs_cord}
408
409        return button_cord
410
411    def collect_log_and_rename(self, device, test_name):
412        files = device.get_file_names("data/misc/bluetooth/logs")
413        log_folder = context.get_current_context().get_full_output_path() + "/cs_logs"
414        os.system("mkdir {0}".format(log_folder))
415        for filename in files:
416            if "cs_log" not in filename:
417                continue
418            log_path = context.get_current_context().get_full_output_path() + "/cs_logs/" + test_name + ".txt"
419            device.pull_files(filename, log_path)
420        return log_path
421
422    def collect_bt_metric(self, tag):
423        self._get_bt_link_metrics(self.initiator, duration=5, bqr_tag='Monitoring , Handle: 0x0040', tag=tag)
424        self._get_bt_link_metrics(self.reflector, duration=5, bqr_tag='Monitoring , Handle: 0x0040', tag=tag)
425
426
427    def start_advertising(self):
428        # Start CS
429        logging.info("Push [Start Adv] on reflector")
430        self._push_button(self.reflector, self.reflector.button_cord["adv"])
431        time.sleep(SHORT_SLEEP)
432
433    def parse_bt_cs_results(self, log_path):
434        measurements = []
435        with open(log_path,'r') as log:
436            for line in log:
437                if 'resultMeters' in line:
438                    range = float(line.split(' ')[1].rstrip())
439                    if range == -1.976171:
440                        measurements.append(float('nan'))
441                    else:
442                        measurements.append(range)
443        avg_distance = numpy.mean(measurements)
444        std_dev_distance = numpy.std(measurements)
445        avg_rssi = 0
446        std_dev_rssi = 0
447        result = {
448            'measurements': measurements,
449            'summary': {'avg_distance': avg_distance, 'avg_rssi': avg_rssi,
450                        'std_dev_distance': std_dev_distance, 'std_dev_rssi': std_dev_rssi,
451                        }
452        }
453        return result
454    def do_channel_sounding(self, test_name="cs_log_temp"):
455        self.cs_setprop(self.initiator)
456        self.initiator.button_cord = self.app_setup(self.initiator)
457
458        logging.info("Push [Connect Gatt] on initiator")
459        self._push_button(self.initiator, self.initiator.button_cord["connect_gatt"])
460        found = self._wait_for_keyword(self.initiator, "CYDBG: MTU changed to: 517", start_time=utils.get_current_epoch_time(), timeout=10)
461        time.sleep(MED_SLEEP)
462        if not found:
463            return False
464
465        logging.info("Push [Start CS] on initiator")
466        push_button_time = utils.get_current_epoch_time()
467        self._push_button(self.initiator, self.initiator.button_cord["cs"])
468
469        keyword_finish_procedures = "CYDBG: Add Node {0} with distance".format(self.CSParameters["max_procedure_count"])
470        timeout = self.CSParameters["max_procedure_count"] * self.CSParameters["min_procedure_interval"] *  self.CSParameters["conn_interval"] * 1.25 / 1000 * 1.5 # may need to adjust according to max_procedure_count
471        self._wait_for_keyword(self.initiator, keyword_finish_procedures, push_button_time, timeout)
472        time.sleep(MED_SLEEP)
473
474        log_file_path = self.collect_log_and_rename(self.initiator, test_name=test_name)
475        result = self.parse_bt_cs_results(log_file_path)
476        #self.log.info("Total Test run time: {0} s".format((utils.get_current_epoch_time() - self.begin_time)/1000.0))
477        return result
478
479    def _restart_app(self, device):
480        device.adb.shell("am force-stop com.example.ble_test")
481        time.sleep(2)
482        device.adb.shell("am start -n com.example.ble_test/com.example.ble_test.MainActivity")
483
484    def _restart_bt(self, device):
485        device.adb.shell("cmd bluetooth_manager disable")
486        device.adb.shell("cmd bluetooth_manager wait-for-state:STATE_OFF")
487        device.adb.shell("cmd bluetooth_manager enable")
488
489    def _get_cord(self, p, fi):
490        x1, y1, x2, y2 = p.findall(fi)[0]
491        cord = ((int(x1) + int(x2)) // 2, (int(y1) + int(y2)) // 2)
492        return cord
493
494    def _wait_for_keyword(self, dut, keyword, start_time, timeout=600):
495        wait = True
496        while wait:
497            if utils.get_current_epoch_time() - start_time > timeout * 1000:
498                logging.info("Wait for {0} timeout".format(keyword))
499                return False
500            time.sleep(0.5)
501            result = dut.search_logcat(keyword, start_time, utils.get_current_epoch_time())
502            if len(result) > 0:
503                wait = False
504                logging.info("Found \"{0}\" with wait time {1} s".format(keyword, (
505                            utils.get_current_epoch_time() - start_time) / 1000.0))
506                return True
507
508    def _push_button(self, dut, button_cord):
509        dut.ensure_screen_on()
510        #dut.unlock_screen()
511        time.sleep(SHORT_SLEEP)
512        dut.adb.shell("input tap {0} {1}".format(button_cord[0], button_cord[1]))
513
514    def _read_distance_from_logcat(self, logcat_filename):
515        distance_readout = []
516        counter_readout = []
517        with open(logcat_filename, 'r') as f:
518            for line in f:
519                if "---- Distance" in line:
520                    distance_readout.append(float(line.split()[-2]))
521                if "---- End of Procedure complete counter" in line:
522                    counter_readout.append(int(line.split()[-6].split(':')[-1]))
523
524        distance_readout = numpy.array(distance_readout)
525        counter_readout = numpy.array(counter_readout)
526
527        return counter_readout, distance_readout
528
529    def _get_bt_link_metrics(self, dut, duration=5, bqr_tag='Monitoring , Handle: 0x0040', tag=''):
530        """Get bt link metrics such as rssi and tx pwls.
531
532        Returns:
533            master_metrics_list: list of metrics of central device
534            slave_metrics_list: list of metric of peripheral device
535        """
536
537        self.raw_bt_metrics_path = os.path.join(context.get_current_context().get_full_output_path(),
538                                                'BT_Raw_Metrics')
539
540        # Get master rssi and power level
541        process_data_dict = btutils.get_bt_metric(
542            dut, duration=duration, bqr_tag=bqr_tag, tag=tag, log_path=self.raw_bt_metrics_path)
543        rssi_master = process_data_dict.get('rssi')
544        pwl_master = process_data_dict.get('pwlv')
545        rssi_c0_master = process_data_dict.get('rssi_c0')
546        rssi_c1_master = process_data_dict.get('rssi_c1')
547        txpw_c0_master = process_data_dict.get('txpw_c0')
548        txpw_c1_master = process_data_dict.get('txpw_c1')
549        bftx_master = process_data_dict.get('bftx')
550        divtx_master = process_data_dict.get('divtx')
551        linkquality_master = process_data_dict.get('linkquality')
552
553        condition = False
554        if condition:
555            rssi_slave = btutils.get_bt_rssi(self.bt_device,
556                                             tag=tag,
557                                             log_path=self.raw_bt_metrics_path)
558        else:
559            rssi_slave = None
560
561        master_metrics_list = [
562            rssi_master, pwl_master, rssi_c0_master, rssi_c1_master,
563            txpw_c0_master, txpw_c1_master, bftx_master, divtx_master, linkquality_master
564        ]
565        slave_metrics_list = [rssi_slave]
566
567        # rssi, pwlv, rssi_c0, rssi_c1, txpw_c0, txpw_c1, bftx, divtx
568
569        return master_metrics_list, slave_metrics_list
570
571
572class RangingComparisonTest(base_test.BaseTestClass):
573
574    def __init__(self, controllers):
575        base_test.BaseTestClass.__init__(self, controllers)
576
577    def setup_class(self):
578        """Initializes common test hardware and parameters.
579
580        This function initializes hardware and compiles parameters that are
581        common to all tests in this class.
582        """
583        self.duts = self.android_devices
584        req_params = [
585            'wifi_ranging_params', 'bt_ranging_params', 'uwb_ranging_params'
586        ]
587        self.unpack_userparams(req_params, [])
588
589        self.testclass_results = {}
590
591    def teardown_test(self):
592        self.process_testcase_result()
593
594    def teardown_class(self):
595        user_input = input('Please reconnect {} and press enter when done.'.format(self.duts[-1].serial))
596        time.sleep(MED_SLEEP)
597        self.duts[1].start_services()
598        if self.uwb_ranging_params['enabled']:
599            self.uwb_ranging_link.clean_up_initiator()
600        if self.wifi_ranging_params['11mc_config']['enabled'] or self.wifi_ranging_params['11az_config']['enabled']:
601            self.wifi_ranging_link.teardown_ranging_link()
602        for dev in self.android_devices:
603            wutils.wifi_toggle_state(dev, False)
604            dev.go_to_sleep()
605
606    def reset_remote_devices(self):
607        user_input = input('Ensure {} is connected and press enter when done.'.format(self.duts[-1].serial))
608        for dev in self.android_devices:
609            dev.reboot()
610        # Turn Wifi On
611        for dev in self.android_devices:
612            # self.log.info('Turning on airplane mode.')
613            # try:
614            #     asserts.assert_true(utils.force_airplane_mode(dev, True),
615            #                         'Can not turn on airplane mode.')
616            # except:
617            #     self.log.warning('Could not enable airplane mode!')
618            wutils.reset_wifi(dev)
619            wutils.wifi_toggle_state(dev, True)
620
621        if self.wifi_ranging_params['11mc_config']['enabled'] or self.wifi_ranging_params['11az_config']['enabled']:
622            self.log.info('Setting up WiFi ranging link.')
623            self.wifi_ranging_link = wputils.brcm_utils.RangingLink(self.duts[0], self.duts[1])
624            self.wifi_ranging_link.setup_ranging_link(self.wifi_ranging_params['channel'],
625                                                      self.wifi_ranging_params['bandwidth'],
626                                                      self.wifi_ranging_params['associate_initiator'],
627                                                      '11mc',
628                                                      self.wifi_ranging_params['11az_config']['enabled'])
629        if self.uwb_ranging_params['enabled']:
630            self.log.info('Setting up UWB ranging link')
631            self.uwb_ranging_link = UwbRangingLink(self.duts[1], self.duts[0], self.uwb_ranging_params)
632            self.uwb_ranging_link.clean_up_initiator()
633            self.uwb_ranging_link.setup_initiator()
634        if self.bt_ranging_params['enabled']:
635            self.log.info('Setting up BT ranging link')
636            self.bt_ranging_link = BtRangingLinkV2(self.duts[0], self.duts[1], self.bt_ranging_params)
637            #self.bt_ranging_link.setup_devices()
638            #self.bt_ranging_link.start_advertising()
639
640        # try:
641        #     self.wifi_ranging_link.teardown_ranging_link()
642        # except:
643        #     self.log.warning('Could not tear down wifi link')
644
645        self.duts[1].stop_services()
646        time.sleep(MED_SLEEP)
647        user_input = input('Disconnect {}, position for test, and press enter when done.'.format(self.duts[-1].serial))
648
649    def test_ranging(self):
650
651        self.test_results = collections.OrderedDict()
652        self.log.info('Starting ranging test.')
653        first_reset_needed = 1
654        while 1:
655            location = input('Enter location ID. Enter "Done" to stop the test: ')
656            if location == 'Done':
657                break
658            true_distance = input('Enter true distance (in meters) between devices: ')
659            if first_reset_needed or self.wifi_ranging_params['reset_devices']:
660                self.reset_remote_devices()
661                first_reset_needed = 0
662            location_result = collections.OrderedDict()
663            location_result['location_id'] = location
664            location_result['true_distance'] = true_distance
665            self.log.info('Starting ranging test at location {} with distance {}.'.format(location, true_distance))
666            if self.wifi_ranging_params['11mc_config']['enabled']:
667                self.log.info('Starting WiFi ranging test.')
668                wifi_11mc_ranging_result = self.wifi_ranging_link.measure_range(self.wifi_ranging_params['num_measurements'],
669                                                                           self.wifi_ranging_params['channel'],
670                                                                           self.wifi_ranging_params['11mc_config']['bandwidth'],
671                                                                           self.wifi_ranging_params['11mc_config']['num_frames'],
672                                                                           ranging_method='11mc',
673                                                                           ant_diversity=0)
674                self.log.info(wifi_11mc_ranging_result['summary'])
675                location_result['wifi_11mc_ranging_result'] = wifi_11mc_ranging_result
676                # wifi_11mc_ant_diversity_ranging_result = self.wifi_ranging_link.measure_range(self.wifi_ranging_params['num_measurements'],
677                #                                                            self.wifi_ranging_params['channel'],
678                #                                                            self.wifi_ranging_params['11mc_config']['bandwidth'],
679                #                                                            self.wifi_ranging_params['11mc_config']['num_frames'],
680                #                                                            ranging_method='11mc',
681                #                                                            ant_diversity=1)
682                # self.log.info(wifi_11mc_ant_diversity_ranging_result['summary'])
683                # location_result['wifi_11mc_ant_diversity_ranging_result'] = wifi_11mc_ant_diversity_ranging_result
684            if self.wifi_ranging_params['11az_config']['enabled']:
685                wifi_11az_ranging_result = self.wifi_ranging_link.measure_range(self.wifi_ranging_params['num_measurements'],
686                                                                           self.wifi_ranging_params['channel'],
687                                                                           self.wifi_ranging_params['11az_config']['bandwidth'],
688                                                                           self.wifi_ranging_params['11az_config']['num_frames'],
689                                                                           ranging_method='11az')
690                self.log.info(wifi_11az_ranging_result['summary'])
691                location_result['wifi_11az_ranging_result'] = wifi_11az_ranging_result
692            if self.uwb_ranging_params['enabled']:
693                self.log.info('Starting UWB ranging test.')
694                location_result['uwb_ranging_result'] = self.uwb_ranging_link.measure_range(
695                    duration=self.uwb_ranging_params['duration'])
696                self.log.info(location_result['uwb_ranging_result'])
697            if self.bt_ranging_params['enabled']:
698                self.log.info('Starting BT ranging test.')
699                location_result['bt_ranging_result'] = self.bt_ranging_link.do_channel_sounding(test_name=location)
700                self.log.info(location_result['bt_ranging_result']['summary'])
701            self.test_results[location] = location_result
702            results_file_path = os.path.join(
703                context.get_current_context().get_full_output_path(),
704                '{}.json'.format(self.current_test_name))
705            with open(results_file_path, 'w') as results_file:
706                json.dump(wputils.serialize_dict(self.test_results),
707                          results_file,
708                          indent=4)
709        return self.test_results
710
711
712
713    def process_testcase_result(self):
714        pass