• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Keys and Key Storage
17#
18# -----------------------------------------------------------------------------
19
20# -----------------------------------------------------------------------------
21# Imports
22# -----------------------------------------------------------------------------
23import logging
24import os
25import json
26from colors import color
27
28from .hci import Address
29
30
31# -----------------------------------------------------------------------------
32# Logging
33# -----------------------------------------------------------------------------
34logger = logging.getLogger(__name__)
35
36
37# -----------------------------------------------------------------------------
38class PairingKeys:
39    class Key:
40        def __init__(self, value, authenticated=False, ediv=None, rand=None):
41            self.value         = value
42            self.authenticated = authenticated
43            self.ediv          = ediv
44            self.rand          = rand
45
46        @classmethod
47        def from_dict(cls, key_dict):
48            value = bytes.fromhex(key_dict['value'])
49            authenticated = key_dict.get('authenticated', False)
50            ediv = key_dict.get('ediv')
51            rand = key_dict.get('rand')
52            if rand is not None:
53                rand = bytes.fromhex(rand)
54
55            return cls(value, authenticated, ediv, rand)
56
57        def to_dict(self):
58            key_dict = {'value': self.value.hex(), 'authenticated': self.authenticated}
59            if self.ediv is not None:
60                key_dict['ediv'] = self.ediv
61            if self.rand is not None:
62                key_dict['rand'] = self.rand.hex()
63
64            return key_dict
65
66    def __init__(self):
67        self.address_type   = None
68        self.ltk            = None
69        self.ltk_central    = None
70        self.ltk_peripheral = None
71        self.irk            = None
72        self.csrk           = None
73        self.link_key       = None  # Classic
74
75    @staticmethod
76    def key_from_dict(keys_dict, key_name):
77        key_dict = keys_dict.get(key_name)
78        if key_dict is not None:
79            return PairingKeys.Key.from_dict(key_dict)
80
81    @staticmethod
82    def from_dict(keys_dict):
83        keys = PairingKeys()
84
85        keys.address_type   = keys_dict.get('address_type')
86        keys.ltk            = PairingKeys.key_from_dict(keys_dict, 'ltk')
87        keys.ltk_central    = PairingKeys.key_from_dict(keys_dict, 'ltk_central')
88        keys.ltk_peripheral = PairingKeys.key_from_dict(keys_dict, 'ltk_peripheral')
89        keys.irk            = PairingKeys.key_from_dict(keys_dict, 'irk')
90        keys.csrk           = PairingKeys.key_from_dict(keys_dict, 'csrk')
91        keys.link_key       = PairingKeys.key_from_dict(keys_dict, 'link_key')
92
93        return keys
94
95    def to_dict(self):
96        keys = {}
97
98        if self.address_type is not None:
99            keys['address_type'] = self.address_type
100
101        if self.ltk is not None:
102            keys['ltk'] = self.ltk.to_dict()
103
104        if self.ltk_central is not None:
105            keys['ltk_central'] = self.ltk_central.to_dict()
106
107        if self.ltk_peripheral is not None:
108            keys['ltk_peripheral'] = self.ltk_peripheral.to_dict()
109
110        if self.irk is not None:
111            keys['irk'] = self.irk.to_dict()
112
113        if self.csrk is not None:
114            keys['csrk'] = self.csrk.to_dict()
115
116        if self.link_key is not None:
117            keys['link_key'] = self.link_key.to_dict()
118
119        return keys
120
121    def print(self, prefix=''):
122        keys_dict = self.to_dict()
123        for (property, value) in keys_dict.items():
124            if type(value) is dict:
125                print(f'{prefix}{color(property, "cyan")}:')
126                for (key_property, key_value) in value.items():
127                    print(f'{prefix}  {color(key_property, "green")}: {key_value}')
128            else:
129                print(f'{prefix}{color(property, "cyan")}: {value}')
130
131
132# -----------------------------------------------------------------------------
133class KeyStore:
134    async def delete(self, name):
135        pass
136
137    async def update(self, name, keys):
138        pass
139
140    async def get(self, name):
141        return PairingKeys()
142
143    async def get_all(self):
144        return []
145
146    async def get_resolving_keys(self):
147        all_keys = await self.get_all()
148        resolving_keys = []
149        for (name, keys) in all_keys:
150            if keys.irk is not None:
151                if keys.address_type is None:
152                    address_type = Address.RANDOM_DEVICE_ADDRESS
153                else:
154                    address_type = keys.address_type
155                resolving_keys.append((keys.irk.value, Address(name, address_type)))
156
157        return resolving_keys
158
159    async def print(self, prefix=''):
160        entries = await self.get_all()
161        separator = ''
162        for (name, keys) in entries:
163            print(separator + prefix + color(name, 'yellow'))
164            keys.print(prefix = prefix + '  ')
165            separator = '\n'
166
167    @staticmethod
168    def create_for_device(device_config):
169        if device_config.keystore is None:
170            return None
171
172        keystore_type = device_config.keystore.split(':', 1)[0]
173        if keystore_type == 'JsonKeyStore':
174            return JsonKeyStore.from_device_config(device_config)
175
176        return None
177
178
179# -----------------------------------------------------------------------------
180class JsonKeyStore(KeyStore):
181    APP_NAME          = 'Bumble'
182    APP_AUTHOR        = 'Google'
183    KEYS_DIR          = 'Pairing'
184    DEFAULT_NAMESPACE = '__DEFAULT__'
185
186    def __init__(self, namespace, filename=None):
187        self.namespace = namespace if namespace is not None else self.DEFAULT_NAMESPACE
188
189        if filename is None:
190            # Use a default for the current user
191            import appdirs
192            self.directory_name = os.path.join(
193                appdirs.user_data_dir(self.APP_NAME, self.APP_AUTHOR),
194                self.KEYS_DIR
195            )
196            json_filename = f'{self.namespace}.json'.lower().replace(':', '-')
197            self.filename = os.path.join(self.directory_name, json_filename)
198        else:
199            self.filename = filename
200            self.directory_name = os.path.dirname(os.path.abspath(self.filename))
201
202        logger.debug(f'JSON keystore: {self.filename}')
203
204    @staticmethod
205    def from_device_config(device_config):
206        params = device_config.keystore.split(':', 1)[1:]
207        namespace = str(device_config.address)
208        if params:
209            filename = params[1]
210        else:
211            filename = None
212
213        return JsonKeyStore(namespace, filename)
214
215    async def load(self):
216        try:
217            with open(self.filename, 'r') as json_file:
218                return json.load(json_file)
219        except FileNotFoundError:
220            return {}
221
222    async def save(self, db):
223        # Create the directory if it doesn't exist
224        if not os.path.exists(self.directory_name):
225            os.makedirs(self.directory_name, exist_ok=True)
226
227        # Save to a temporary file
228        temp_filename = self.filename + '.tmp'
229        with open(temp_filename, 'w') as output:
230            json.dump(db, output, sort_keys=True, indent=4)
231
232        # Atomically replace the previous file
233        os.rename(temp_filename, self.filename)
234
235    async def delete(self, name):
236        db = await self.load()
237
238        namespace = db.get(self.namespace)
239        if namespace is None:
240            raise KeyError(name)
241
242        del namespace[name]
243        await self.save(db)
244
245    async def update(self, name, keys):
246        db = await self.load()
247
248        namespace = db.setdefault(self.namespace, {})
249        namespace[name] = keys.to_dict()
250
251        await self.save(db)
252
253    async def get_all(self):
254        db = await self.load()
255
256        namespace = db.get(self.namespace)
257        if namespace is None:
258            return []
259
260        return [(name, PairingKeys.from_dict(keys)) for (name, keys) in namespace.items()]
261
262    async def get(self, name):
263        db = await self.load()
264
265        namespace = db.get(self.namespace)
266        if namespace is None:
267            return None
268
269        keys = namespace.get(name)
270        if keys is None:
271            return None
272
273        return PairingKeys.from_dict(keys)
274