1#!/usr/bin/env python3 2# 3# Copyright (C) 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16 17import json 18import re 19import subprocess 20import time 21 22PERSISTENT_BLUETOOTH_STORAGE_LOCATION = "/data/persistent/c1a6d0aebbf7c092c53e8e696636af8ec0629ff39b7f2e548430b0034d809da4/stash_secure.store" 23 24 25def le_scan_for_device_by_name(fd, 26 log, 27 search_name, 28 timeout, 29 partial_match=False, 30 self_manage_scan=True): 31 """Scan for and returns the first BLE advertisement with the device name. 32 33 Args: 34 fd: The Fuchsia device to start LE scanning on. 35 log: The log var passed in from the test. 36 search_name: The name to find. 37 timeout: How long to scan for. 38 partial_match: Only do a partial match for the LE advertising name. 39 This will return the first result that had a partial match. 40 self_manage_scan: Whther or not this function should start/stop (True) 41 scans or if the caller should (False). 42 43 Returns: 44 The dictionary of device information. 45 """ 46 if self_manage_scan: 47 scan_filter = {"name_substring": search_name} 48 fd.gattc_lib.bleStartBleScan(scan_filter) 49 end_time = time.time() + timeout 50 found_device = None 51 while time.time() < end_time and not found_device: 52 time.sleep(1) 53 scan_res = fd.gattc_lib.bleGetDiscoveredDevices()['result'] 54 for device in scan_res: 55 name, did, connectable = device["name"], device["id"], device[ 56 "connectable"] 57 if name == search_name or (partial_match and search_name in name): 58 log.info("Successfully found advertisement! name, id: {}, {}". 59 format(name, did)) 60 found_device = device 61 if self_manage_scan: 62 fd.gattc_lib.bleStopBleScan() 63 if not found_device: 64 log.error("Failed to find device with name {}.".format(search_name)) 65 return found_device 66 67 68def bredr_scan_for_device_by_name(fd, 69 log, 70 search_name, 71 timeout, 72 partial_match=False): 73 """Discover for and returns the first Classic device that matches search_name. 74 75 Args: 76 fd: The Fuchsia device to start Classic discovery on. 77 log: The log var passed in from the test. 78 search_name: The name to find. 79 timeout: How long to scan for. 80 partial_match: Only do a partial match for the search_name. 81 This will return the first result that had a partial match. 82 83 Returns: 84 The dictionary of device information. 85 """ 86 fd.bts_lib.requestDiscovery(True) 87 88 end_time = time.time() + timeout 89 found_device = None 90 while time.time() < end_time and not found_device: 91 scan_res = fd.bts_lib.getKnownRemoteDevices()['result'] 92 for device in scan_res: 93 name, did = scan_res[device]["name"], scan_res[device]["id"] 94 if name == search_name or (partial_match and search_name in name): 95 log.info("Successfully found peer! name, id: {}, {}".format( 96 name, did)) 97 found_device = did 98 time.sleep(1) 99 fd.bts_lib.requestDiscovery(False) 100 if not found_device: 101 log.error("Failed to find device with name {}.".format(search_name)) 102 return found_device 103 return found_device 104 105 106def unbond_all_known_devices(fd, log): 107 """Unbond all known devices from input Fuchsia Device. 108 109 Args: 110 fd: The Fuchsia device to unbond devices from. 111 log: The log var passed in from the test. 112 """ 113 fd.bts_lib.requestDiscovery(True) 114 device_list = fd.bts_lib.getKnownRemoteDevices()['result'] 115 fd.bts_lib.requestDiscovery(False) 116 for device in device_list: 117 d = device_list[device] 118 if d['bonded'] or d['connected']: 119 log.info("Unbonding device: {}".format(d)) 120 log.info(fd.bts_lib.forgetDevice(d['id'])['result']) 121 122 123def verify_device_state_by_name(fd, log, search_name, state, services=None): 124 """Verify a connection state change happened an input device. 125 126 Args: 127 fd: The Fuchsia device to unbond devices from. 128 log: The log var passed in from the test. 129 search_name: The device name to find. 130 state: The expected state. 131 services: An optional list of services to expect based on the connected 132 device. 133 """ 134 fd.bts_lib.requestDiscovery(True) 135 136 seconds_allowed_for_state_change = 10 137 end_time = time.time() + seconds_allowed_for_state_change 138 found_state = None 139 while time.time() < end_time and not found_state: 140 device_list = fd.bts_lib.getKnownRemoteDevices()['result'] 141 for device in device_list: 142 d = device_list[device] 143 name = d['name'] 144 if name == search_name: 145 print(d) 146 if state == "CONNECTED" and d['connected']: 147 log.info("Found connected device {}".format(d)) 148 found_state = True 149 break 150 if state == "BONDED" and d['bonded']: 151 log.info("Found bonded device {}".format(d)) 152 found_state = True 153 break 154 time.sleep(1) 155 #TODO: Verify services. 156 fd.bts_lib.requestDiscovery(False) 157 return found_state 158 159 160def decode_list_to_link_key(raw_list): 161 """ Decodes the input int list to a string link key 162 Args: 163 raw_list: The list of int values to convert 164 Returns: 165 A string represetnation of the link key 166 """ 167 str_list = "" 168 raw_list.reverse() 169 for item in raw_list: 170 check = str(hex(int(item)))[2:] 171 if len(check) == 1: 172 check = "0{}".format(check) 173 str_list += check 174 return str_list 175 176 177def get_link_keys(fd, save_path): 178 """Get Bluetooth link keys and LTKs for input Fuchsia device. 179 180 Args: 181 fd: The Fuchsia device object. 182 save_path: The custom save path. 183 Returns: 184 Dictionary of known LTKs and link keys 185 """ 186 subprocess.run([ 187 f"scp -F {fd.ssh_config} -6 [{fd.ip}]:{PERSISTENT_BLUETOOTH_STORAGE_LOCATION} {save_path}" 188 ], 189 shell=True) 190 stash_secure_output = "" 191 with open(save_path, 'rb') as file: 192 stash_secure_output = file.read() 193 non_ascii_bytes_removed = re.sub(rb'[^\x00-\x7f]', rb'', 194 stash_secure_output).decode('utf-8') 195 196 bonding_data_split = non_ascii_bytes_removed.split("bonding-data:") 197 bonding_data_split.pop(0) 198 data_dict = {} 199 for data in bonding_data_split: 200 if "saved_networks" in data: 201 data = data.split("saved_networks")[0] 202 trailing_data_removed = re.sub(r'^.*?{', '{', data).strip() 203 204 more_trailing_data = trailing_data_removed.rsplit('}', 1)[0] + "}" 205 # Sometimes 'ost-data' will be apended at the end. 206 even_more_trailing_info = more_trailing_data.split('ost-data')[0] 207 208 # Remove the special chars at the end of the string that start with x1b 209 clean_json = more_trailing_data.split('\x1b')[0] 210 211 json_conversion = json.loads(clean_json) 212 identifier = json_conversion.get("identifier") 213 device_name = json_conversion.get("name") 214 215 device_address = decode_list_to_link_key( 216 json_conversion.get("address").get("value")) 217 device_address = ':'.join([ 218 device_address[i:i + 2] for i in range(0, len(device_address), 2) 219 ]) 220 221 data_dict[identifier] = { 222 "device_name": device_name, 223 "device_address": device_address 224 } 225 226 if json_conversion.get("bredr") is not None: 227 link_key = decode_list_to_link_key( 228 json_conversion.get("bredr").get("linkKey").get("value")) 229 data_dict[identifier]["bredr_link_key"] = link_key 230 231 if json_conversion.get("le") is not None: 232 ltk_key = decode_list_to_link_key( 233 json_conversion.get("le").get("localLtk").get("key").get( 234 "value")) 235 data_dict[identifier]["le_ltk"] = ltk_key 236 237 return data_dict 238