• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2018 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17import json
18import re
19import subprocess
20import time
21
22PERSISTENT_BLUETOOTH_STORAGE_LOCATION = "/data/persistent/c1a6d0aebbf7c092c53e8e696636af8ec0629ff39b7f2e548430b0034d809da4/stash_secure.store"
23
24
25def le_scan_for_device_by_name(fd,
26                               log,
27                               search_name,
28                               timeout,
29                               partial_match=False,
30                               self_manage_scan=True):
31    """Scan for and returns the first BLE advertisement with the device name.
32
33    Args:
34        fd: The Fuchsia device to start LE scanning on.
35        log: The log var passed in from the test.
36        search_name: The name to find.
37        timeout: How long to scan for.
38        partial_match: Only do a partial match for the LE advertising name.
39          This will return the first result that had a partial match.
40        self_manage_scan: Whther or not this function should start/stop (True)
41          scans or if the caller should (False).
42
43    Returns:
44        The dictionary of device information.
45    """
46    if self_manage_scan:
47        scan_filter = {"name_substring": search_name}
48        fd.gattc_lib.bleStartBleScan(scan_filter)
49    end_time = time.time() + timeout
50    found_device = None
51    while time.time() < end_time and not found_device:
52        time.sleep(1)
53        scan_res = fd.gattc_lib.bleGetDiscoveredDevices()['result']
54        for device in scan_res:
55            name, did, connectable = device["name"], device["id"], device[
56                "connectable"]
57            if name == search_name or (partial_match and search_name in name):
58                log.info("Successfully found advertisement! name, id: {}, {}".
59                         format(name, did))
60                found_device = device
61    if self_manage_scan:
62        fd.gattc_lib.bleStopBleScan()
63    if not found_device:
64        log.error("Failed to find device with name {}.".format(search_name))
65    return found_device
66
67
68def bredr_scan_for_device_by_name(fd,
69                                  log,
70                                  search_name,
71                                  timeout,
72                                  partial_match=False):
73    """Discover for and returns the first Classic device that matches search_name.
74
75    Args:
76        fd: The Fuchsia device to start Classic discovery on.
77        log: The log var passed in from the test.
78        search_name: The name to find.
79        timeout: How long to scan for.
80        partial_match: Only do a partial match for the search_name.
81          This will return the first result that had a partial match.
82
83    Returns:
84        The dictionary of device information.
85    """
86    fd.bts_lib.requestDiscovery(True)
87
88    end_time = time.time() + timeout
89    found_device = None
90    while time.time() < end_time and not found_device:
91        scan_res = fd.bts_lib.getKnownRemoteDevices()['result']
92        for device in scan_res:
93            name, did = scan_res[device]["name"], scan_res[device]["id"]
94            if name == search_name or (partial_match and search_name in name):
95                log.info("Successfully found peer! name, id: {}, {}".format(
96                    name, did))
97                found_device = did
98        time.sleep(1)
99    fd.bts_lib.requestDiscovery(False)
100    if not found_device:
101        log.error("Failed to find device with name {}.".format(search_name))
102        return found_device
103    return found_device
104
105
106def unbond_all_known_devices(fd, log):
107    """Unbond all known devices from input Fuchsia Device.
108
109    Args:
110        fd: The Fuchsia device to unbond devices from.
111        log: The log var passed in from the test.
112    """
113    fd.bts_lib.requestDiscovery(True)
114    device_list = fd.bts_lib.getKnownRemoteDevices()['result']
115    fd.bts_lib.requestDiscovery(False)
116    for device in device_list:
117        d = device_list[device]
118        if d['bonded'] or d['connected']:
119            log.info("Unbonding device: {}".format(d))
120            log.info(fd.bts_lib.forgetDevice(d['id'])['result'])
121
122
123def verify_device_state_by_name(fd, log, search_name, state, services=None):
124    """Verify a connection state change happened an input device.
125
126    Args:
127        fd: The Fuchsia device to unbond devices from.
128        log: The log var passed in from the test.
129        search_name: The device name to find.
130        state: The expected state.
131        services: An optional list of services to expect based on the connected
132            device.
133    """
134    fd.bts_lib.requestDiscovery(True)
135
136    seconds_allowed_for_state_change = 10
137    end_time = time.time() + seconds_allowed_for_state_change
138    found_state = None
139    while time.time() < end_time and not found_state:
140        device_list = fd.bts_lib.getKnownRemoteDevices()['result']
141        for device in device_list:
142            d = device_list[device]
143            name = d['name']
144            if name == search_name:
145                print(d)
146                if state == "CONNECTED" and d['connected']:
147                    log.info("Found connected device {}".format(d))
148                    found_state = True
149                    break
150                if state == "BONDED" and d['bonded']:
151                    log.info("Found bonded device {}".format(d))
152                    found_state = True
153                    break
154        time.sleep(1)
155    #TODO: Verify services.
156    fd.bts_lib.requestDiscovery(False)
157    return found_state
158
159
160def decode_list_to_link_key(raw_list):
161    """ Decodes the input int list to a string link key
162    Args:
163        raw_list: The list of int values to convert
164    Returns:
165        A string represetnation of the link key
166    """
167    str_list = ""
168    raw_list.reverse()
169    for item in raw_list:
170        check = str(hex(int(item)))[2:]
171        if len(check) == 1:
172            check = "0{}".format(check)
173        str_list += check
174    return str_list
175
176
177def get_link_keys(fd, save_path):
178    """Get Bluetooth link keys and LTKs for input Fuchsia device.
179
180    Args:
181        fd: The Fuchsia device object.
182        save_path: The custom save path.
183    Returns:
184        Dictionary of known LTKs and link keys
185    """
186    subprocess.run([
187        f"scp -F {fd.ssh_config} -6 [{fd.ip}]:{PERSISTENT_BLUETOOTH_STORAGE_LOCATION} {save_path}"
188    ],
189                   shell=True)
190    stash_secure_output = ""
191    with open(save_path, 'rb') as file:
192        stash_secure_output = file.read()
193    non_ascii_bytes_removed = re.sub(rb'[^\x00-\x7f]', rb'',
194                                     stash_secure_output).decode('utf-8')
195
196    bonding_data_split = non_ascii_bytes_removed.split("bonding-data:")
197    bonding_data_split.pop(0)
198    data_dict = {}
199    for data in bonding_data_split:
200        if "saved_networks" in data:
201            data = data.split("saved_networks")[0]
202        trailing_data_removed = re.sub(r'^.*?{', '{', data).strip()
203
204        more_trailing_data = trailing_data_removed.rsplit('}', 1)[0] + "}"
205        # Sometimes 'ost-data' will be apended at the end.
206        even_more_trailing_info = more_trailing_data.split('ost-data')[0]
207
208        # Remove the special chars at the end of the string that start with x1b
209        clean_json = more_trailing_data.split('\x1b')[0]
210
211        json_conversion = json.loads(clean_json)
212        identifier = json_conversion.get("identifier")
213        device_name = json_conversion.get("name")
214
215        device_address = decode_list_to_link_key(
216            json_conversion.get("address").get("value"))
217        device_address = ':'.join([
218            device_address[i:i + 2] for i in range(0, len(device_address), 2)
219        ])
220
221        data_dict[identifier] = {
222            "device_name": device_name,
223            "device_address": device_address
224        }
225
226        if json_conversion.get("bredr") is not None:
227            link_key = decode_list_to_link_key(
228                json_conversion.get("bredr").get("linkKey").get("value"))
229            data_dict[identifier]["bredr_link_key"] = link_key
230
231        if json_conversion.get("le") is not None:
232            ltk_key = decode_list_to_link_key(
233                json_conversion.get("le").get("localLtk").get("key").get(
234                    "value"))
235            data_dict[identifier]["le_ltk"] = ltk_key
236
237    return data_dict
238