1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Keys and Key Storage 17# 18# ----------------------------------------------------------------------------- 19 20# ----------------------------------------------------------------------------- 21# Imports 22# ----------------------------------------------------------------------------- 23import logging 24import os 25import json 26from colors import color 27 28from .hci import Address 29 30 31# ----------------------------------------------------------------------------- 32# Logging 33# ----------------------------------------------------------------------------- 34logger = logging.getLogger(__name__) 35 36 37# ----------------------------------------------------------------------------- 38class PairingKeys: 39 class Key: 40 def __init__(self, value, authenticated=False, ediv=None, rand=None): 41 self.value = value 42 self.authenticated = authenticated 43 self.ediv = ediv 44 self.rand = rand 45 46 @classmethod 47 def from_dict(cls, key_dict): 48 value = bytes.fromhex(key_dict['value']) 49 authenticated = key_dict.get('authenticated', False) 50 ediv = key_dict.get('ediv') 51 rand = key_dict.get('rand') 52 if rand is not None: 53 rand = bytes.fromhex(rand) 54 55 return cls(value, authenticated, ediv, rand) 56 57 def to_dict(self): 58 key_dict = {'value': self.value.hex(), 'authenticated': self.authenticated} 59 if self.ediv is not None: 60 key_dict['ediv'] = self.ediv 61 if self.rand is not None: 62 key_dict['rand'] = self.rand.hex() 63 64 return key_dict 65 66 def __init__(self): 67 self.address_type = None 68 self.ltk = None 69 self.ltk_central = None 70 self.ltk_peripheral = None 71 self.irk = None 72 self.csrk = None 73 self.link_key = None # Classic 74 75 @staticmethod 76 def key_from_dict(keys_dict, key_name): 77 key_dict = keys_dict.get(key_name) 78 if key_dict is not None: 79 return PairingKeys.Key.from_dict(key_dict) 80 81 @staticmethod 82 def from_dict(keys_dict): 83 keys = PairingKeys() 84 85 keys.address_type = keys_dict.get('address_type') 86 keys.ltk = PairingKeys.key_from_dict(keys_dict, 'ltk') 87 keys.ltk_central = PairingKeys.key_from_dict(keys_dict, 'ltk_central') 88 keys.ltk_peripheral = PairingKeys.key_from_dict(keys_dict, 'ltk_peripheral') 89 keys.irk = PairingKeys.key_from_dict(keys_dict, 'irk') 90 keys.csrk = PairingKeys.key_from_dict(keys_dict, 'csrk') 91 keys.link_key = PairingKeys.key_from_dict(keys_dict, 'link_key') 92 93 return keys 94 95 def to_dict(self): 96 keys = {} 97 98 if self.address_type is not None: 99 keys['address_type'] = self.address_type 100 101 if self.ltk is not None: 102 keys['ltk'] = self.ltk.to_dict() 103 104 if self.ltk_central is not None: 105 keys['ltk_central'] = self.ltk_central.to_dict() 106 107 if self.ltk_peripheral is not None: 108 keys['ltk_peripheral'] = self.ltk_peripheral.to_dict() 109 110 if self.irk is not None: 111 keys['irk'] = self.irk.to_dict() 112 113 if self.csrk is not None: 114 keys['csrk'] = self.csrk.to_dict() 115 116 if self.link_key is not None: 117 keys['link_key'] = self.link_key.to_dict() 118 119 return keys 120 121 def print(self, prefix=''): 122 keys_dict = self.to_dict() 123 for (property, value) in keys_dict.items(): 124 if type(value) is dict: 125 print(f'{prefix}{color(property, "cyan")}:') 126 for (key_property, key_value) in value.items(): 127 print(f'{prefix} {color(key_property, "green")}: {key_value}') 128 else: 129 print(f'{prefix}{color(property, "cyan")}: {value}') 130 131 132# ----------------------------------------------------------------------------- 133class KeyStore: 134 async def delete(self, name): 135 pass 136 137 async def update(self, name, keys): 138 pass 139 140 async def get(self, name): 141 return PairingKeys() 142 143 async def get_all(self): 144 return [] 145 146 async def get_resolving_keys(self): 147 all_keys = await self.get_all() 148 resolving_keys = [] 149 for (name, keys) in all_keys: 150 if keys.irk is not None: 151 if keys.address_type is None: 152 address_type = Address.RANDOM_DEVICE_ADDRESS 153 else: 154 address_type = keys.address_type 155 resolving_keys.append((keys.irk.value, Address(name, address_type))) 156 157 return resolving_keys 158 159 async def print(self, prefix=''): 160 entries = await self.get_all() 161 separator = '' 162 for (name, keys) in entries: 163 print(separator + prefix + color(name, 'yellow')) 164 keys.print(prefix = prefix + ' ') 165 separator = '\n' 166 167 @staticmethod 168 def create_for_device(device_config): 169 if device_config.keystore is None: 170 return None 171 172 keystore_type = device_config.keystore.split(':', 1)[0] 173 if keystore_type == 'JsonKeyStore': 174 return JsonKeyStore.from_device_config(device_config) 175 176 return None 177 178 179# ----------------------------------------------------------------------------- 180class JsonKeyStore(KeyStore): 181 APP_NAME = 'Bumble' 182 APP_AUTHOR = 'Google' 183 KEYS_DIR = 'Pairing' 184 DEFAULT_NAMESPACE = '__DEFAULT__' 185 186 def __init__(self, namespace, filename=None): 187 self.namespace = namespace if namespace is not None else self.DEFAULT_NAMESPACE 188 189 if filename is None: 190 # Use a default for the current user 191 import appdirs 192 self.directory_name = os.path.join( 193 appdirs.user_data_dir(self.APP_NAME, self.APP_AUTHOR), 194 self.KEYS_DIR 195 ) 196 json_filename = f'{self.namespace}.json'.lower().replace(':', '-') 197 self.filename = os.path.join(self.directory_name, json_filename) 198 else: 199 self.filename = filename 200 self.directory_name = os.path.dirname(os.path.abspath(self.filename)) 201 202 logger.debug(f'JSON keystore: {self.filename}') 203 204 @staticmethod 205 def from_device_config(device_config): 206 params = device_config.keystore.split(':', 1)[1:] 207 namespace = str(device_config.address) 208 if params: 209 filename = params[1] 210 else: 211 filename = None 212 213 return JsonKeyStore(namespace, filename) 214 215 async def load(self): 216 try: 217 with open(self.filename, 'r') as json_file: 218 return json.load(json_file) 219 except FileNotFoundError: 220 return {} 221 222 async def save(self, db): 223 # Create the directory if it doesn't exist 224 if not os.path.exists(self.directory_name): 225 os.makedirs(self.directory_name, exist_ok=True) 226 227 # Save to a temporary file 228 temp_filename = self.filename + '.tmp' 229 with open(temp_filename, 'w') as output: 230 json.dump(db, output, sort_keys=True, indent=4) 231 232 # Atomically replace the previous file 233 os.rename(temp_filename, self.filename) 234 235 async def delete(self, name): 236 db = await self.load() 237 238 namespace = db.get(self.namespace) 239 if namespace is None: 240 raise KeyError(name) 241 242 del namespace[name] 243 await self.save(db) 244 245 async def update(self, name, keys): 246 db = await self.load() 247 248 namespace = db.setdefault(self.namespace, {}) 249 namespace[name] = keys.to_dict() 250 251 await self.save(db) 252 253 async def get_all(self): 254 db = await self.load() 255 256 namespace = db.get(self.namespace) 257 if namespace is None: 258 return [] 259 260 return [(name, PairingKeys.from_dict(keys)) for (name, keys) in namespace.items()] 261 262 async def get(self, name): 263 db = await self.load() 264 265 namespace = db.get(self.namespace) 266 if namespace is None: 267 return None 268 269 keys = namespace.get(name) 270 if keys is None: 271 return None 272 273 return PairingKeys.from_dict(keys) 274