• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright (c) 2022 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import argparse
19import ast
20import base64
21import errno
22import json
23import os
24import sys
25import shutil
26import subprocess
27import tempfile
28
29from datetime import datetime
30
31CRED_VERSION = '1.0.1'
32ATTESTATION_KEY_USERPUBLICKEY = 'userPublicKey'
33ATTESTATION_KEY_SIGNATURE = 'signature'
34
35
36def _message(message, file):
37    if message and file:
38        file.write(str(message) + os.linesep)
39
40
41def _error_message(message, file=sys.stderr):
42    _message(message, file)
43
44
45def _info_message(message, file=sys.stdout):
46    _message(message, file)
47
48
49def run_command(command, input_data=None):
50    """
51    input:
52        command: str tuple/list of command and options to compose full cmd
53        input_data: bytes or None that will be redirect to command
54    output:
55        recode: int, non-zero means failure
56        out: bytes or None, stdout info
57        err: bytes or None, stderr info
58    """
59    TIMEOUT_SECONDS = 30
60    try:
61        proc = subprocess.Popen(
62            command,
63            stdin=None if input_data is None else subprocess.PIPE,
64            stdout=subprocess.PIPE,
65            stderr=subprocess.PIPE
66        )
67    except FileNotFoundError:
68        return errno.ENOENT, b'', b'executable not found'
69
70    try:
71        if input_data is None:
72            proc.wait(timeout=TIMEOUT_SECONDS)
73            out = proc.stdout.read()
74            err = proc.stderr.read()
75        else:
76            out, err = proc.communicate(
77                input=input_data, timeout=TIMEOUT_SECONDS)
78    except subprocess.TimeoutExpired:
79        proc.kill()
80        return errno.ETIMEDOUT, b'', b'timeout'
81
82    return proc.returncode, out, err
83
84
85class OpenSslWrapperException(Exception):
86    """raised when openssl execution failed"""
87    pass
88
89
90class OpenSslWrapper:
91    """wrapper for openssl:"""
92
93    PEM_PERFIX = '-----BEGIN PUBLIC KEY-----'
94    PEM_SUFFIX = '-----END PUBLIC KEY-----'
95
96    def __init__(self, store_dir: str = 'artifacts'):
97        self.exe = shutil.which('openssl')
98        if self.exe is None or not os.path.exists(self.exe):
99            raise OpenSslWrapperException('openssl binary not found')
100        store_dir = os.path.join(os.getcwd(), store_dir)
101        if not os.path.exists(store_dir):
102            os.makedirs(store_dir, 0o700)
103        self.store_dir = store_dir
104
105    def generate_ecc_key_pair(self, key_alias: str, curve: str = 'brainpoolP384r1') -> bool:
106        sk_file = os.path.join(self.store_dir, '{}.key.pem'.format(key_alias))
107        pk_file = os.path.join(self.store_dir, '{}.pub.pem'.format(key_alias))
108
109        try:
110            if not os.path.isfile(sk_file):
111                code, res, err = run_command([self.exe, 'ecparam', '-genkey', '-name', curve, '-out', sk_file])
112                if code != 0:
113                    raise OpenSslWrapperException('ecparam failed, err: {}'.format(err.decode('utf8')))
114
115            if not os.path.isfile(pk_file):
116                code, res, err = run_command(
117                    [self.exe, 'ec', '-in', sk_file, '-pubout', '-out', pk_file])
118                if code != 0:
119                    raise OpenSslWrapperException(
120                        'ec failed, err: {}'.format(err.decode('utf8')))
121        except OpenSslWrapperException:
122            if os.path.exists(sk_file):
123                os.remove(sk_file)
124            if os.path.exists(pk_file):
125                os.remove(pk_file)
126            return False
127        return True
128
129    def get_ecc_public_key(self, key_alias: str) -> bytes:
130        sk_file = os.path.join(self.store_dir, '{}.key.pem'.format(key_alias))
131        pk_file = os.path.join(self.store_dir, '{}.pub.pem'.format(key_alias))
132
133        try:
134            if not os.path.isfile(pk_file):
135                code, res, err = run_command([self.exe, 'ec', '-in', sk_file, '-pubout', '-out', pk_file])
136                if code != 0:
137                    raise OpenSslWrapperException('ec failed, err: {}'.format(err.decode('utf8')))
138            with open(pk_file) as fp:
139                pem_str = fp.read().replace(OpenSslWrapper.PEM_PERFIX, '')
140                pem_str = pem_str.replace(OpenSslWrapper.PEM_SUFFIX, '')
141                pem_str = pem_str.replace(os.linesep, '')
142                return base64.b64decode(pem_str)
143
144        except OpenSslWrapperException as ex:
145            if os.path.exists(sk_file):
146                os.remove(sk_file)
147            if os.path.exists(pk_file):
148                os.remove(pk_file)
149            raise ex
150
151    def digest_sign(self, key_alias: str, content: bytes, digest: str = '-sha384') -> bytes:
152        sk_file = os.path.join(self.store_dir, '{}.key.pem'.format(key_alias))
153
154        if not os.path.isfile(sk_file):
155            raise OpenSslWrapperException('path not exists')
156
157        code, res, err = run_command(
158            [self.exe, 'dgst', digest, '-sign', sk_file, ], content
159        )
160        if code != 0:
161            raise OpenSslWrapperException('dgst failed, err: {}'.format(err.decode('utf8')))
162
163        return res
164
165    def digest_verify_with_key_alias(self, key_alias: str, content: bytes, sig: bytes, digest: str = '-sha384') -> bool:
166        pk_file = os.path.join(self.store_dir, '{}.pub.pem'.format(key_alias))
167
168        if not os.path.isfile(pk_file):
169            raise OpenSslWrapperException('path not exists')
170        try:
171            with tempfile.NamedTemporaryFile('wb', dir=self.store_dir, delete=False) as signature_file:
172                signature_file.write(sig)
173                signature_file.close()
174
175            code, res, err = run_command(
176                [self.exe, 'dgst', digest, '-verify', pk_file, '-signature', signature_file.name, ], content
177            )
178        finally:
179            os.remove(signature_file.name)
180        return True if code == 0 else False
181
182    def digest_verify_with_pub_key(self, pub_key: bytes, content: bytes, sig: bytes, digest: str = '-sha384') -> bool:
183        try:
184            with tempfile.NamedTemporaryFile('wb+', dir=self.store_dir, delete=False) as pubkey_file:
185                # create an temp pubkey pem file
186                pub_key_base64 = base64.b64encode(pub_key).decode('utf8')
187
188                pub_key = '{}{}{}{}{}{}'.format(
189                    OpenSslWrapper.PEM_PERFIX, os.linesep,
190                    pub_key_base64, os.linesep,
191                    OpenSslWrapper.PEM_SUFFIX, os.linesep
192                )
193                pubkey_file.write(pub_key.encode('utf8'))
194                pubkey_file.close()
195
196            with tempfile.NamedTemporaryFile('wb', dir=self.store_dir, delete=False) as signature_file:
197                # create an temp signature file
198                signature_file.write(sig)
199                signature_file.close()
200
201            code, res, err = run_command(
202                [self.exe, 'dgst', digest, '-verify', pubkey_file.name, '-signature', signature_file.name, ], content
203            )
204        finally:
205            os.remove(pubkey_file.name)
206            os.remove(signature_file.name)
207
208        return True if code == 0 else False
209
210
211class CredInitializationException(Exception):
212    """raised when CredInitialization execution failed"""
213    pass
214
215
216class CredInitialization:
217    KEY_ALIAS_ROOT = 'root'
218    KEY_ALIAS_OEM = 'oem'
219    KEY_ALIAS_DEVICE = 'device'
220
221    def __init__(self, store_dir: str):
222        self.ssl = OpenSslWrapper(store_dir)
223
224    def process(self):
225        ssl = self.ssl
226        try:
227            success = ssl.generate_ecc_key_pair(CredInitialization.KEY_ALIAS_ROOT)
228            if not success:
229                raise CredInitializationException('generate_ecc_key_pair root error')
230            success = ssl.generate_ecc_key_pair(CredInitialization.KEY_ALIAS_OEM)
231            if not success:
232                raise CredInitializationException('generate_ecc_key_pair oem error')
233            success = ssl.generate_ecc_key_pair(CredInitialization.KEY_ALIAS_DEVICE)
234            if not success:
235                raise CredInitializationException('generate_ecc_key_pair device error')
236
237        except CredInitializationException:
238            _error_message('cred init failed')
239            return
240
241
242class CredCreationException(Exception):
243    """raised when CredInitialization execution failed"""
244    pass
245
246
247class CredCreation:
248    def __init__(self, store_dir: str, file: str, payload: dict):
249        self.ssl = OpenSslWrapper(store_dir)
250        self.payload = payload
251        self.file = file
252
253    def _gene_head(self):
254        head = {"typ": "DSL"}
255        return base64.b64encode(json.dumps(head, ensure_ascii=True).encode('utf8')).decode('utf8')
256
257    def _gene_attestation(self, root_pk: str, root_sign: str,
258                          oem_pk: str, oem_sign: str,
259                          device_pk: str, device_sign: str):
260        data = [
261            {
262                ATTESTATION_KEY_USERPUBLICKEY: device_pk,
263                ATTESTATION_KEY_SIGNATURE: device_sign
264            },
265            {
266                ATTESTATION_KEY_USERPUBLICKEY: oem_pk,
267                ATTESTATION_KEY_SIGNATURE: oem_sign
268            },
269            {
270                ATTESTATION_KEY_USERPUBLICKEY: root_pk,
271                ATTESTATION_KEY_SIGNATURE: root_sign
272            },
273        ]
274        return base64.b64encode(json.dumps(data, ensure_ascii=True).encode('utf8')).decode('utf8')
275
276    def _gene_payload(self):
277        # add sign time
278
279        self.payload['signTime'] = datetime.now().strftime('%Y%m%d%H%M%S')
280        self.payload['version'] = CRED_VERSION
281        return base64.b64encode(json.dumps(self.payload, ensure_ascii=True).encode('utf8')).decode('utf8')
282
283    def process(self):
284        ssl = self.ssl
285        try:
286            # root self signed
287            root_pub_bytes = ssl.get_ecc_public_key(CredInitialization.KEY_ALIAS_ROOT)
288            root_pub_self_signed_bytes = ssl.digest_sign(CredInitialization.KEY_ALIAS_ROOT, root_pub_bytes)
289            root_pub_str = base64.b64encode(root_pub_bytes).decode('utf8')
290            root_pub_self_sign_str = base64.b64encode(root_pub_self_signed_bytes).decode('utf8')
291
292            # oem signed by root
293            oem_pub_bytes = ssl.get_ecc_public_key(CredInitialization.KEY_ALIAS_OEM)
294            oem_pub_signed_bytes = ssl.digest_sign(CredInitialization.KEY_ALIAS_ROOT, oem_pub_bytes)
295            oem_pub_signed_str = base64.b64encode(oem_pub_signed_bytes).decode('utf8')
296            oem_pub_str = base64.b64encode(oem_pub_bytes).decode('utf8')
297
298            # device signed by oem
299            device_pub_bytes = ssl.get_ecc_public_key(CredInitialization.KEY_ALIAS_DEVICE)
300            device_pub_signed_bytes = ssl.digest_sign(CredInitialization.KEY_ALIAS_OEM, device_pub_bytes)
301            device_pub_signed_str = base64.b64encode(device_pub_signed_bytes).decode('utf8')
302            device_pub_str = base64.b64encode(device_pub_bytes).decode('utf8')
303
304            attestation = self._gene_attestation(root_pub_str, root_pub_self_sign_str,
305                                                 oem_pub_str, oem_pub_signed_str,
306                                                 device_pub_str, device_pub_signed_str)
307
308            head = self._gene_head()
309            payload = self._gene_payload()
310            head_payload = '{}.{}'.format(head, payload)
311
312            head_payload_signed_bytes = ssl.digest_sign(
313                CredInitialization.KEY_ALIAS_DEVICE, head_payload.encode('utf8'))
314            head_payload_signed_string = base64.b64encode(head_payload_signed_bytes).decode('utf8')
315            cred = '{}.{}.{}'.format(head_payload, head_payload_signed_string, attestation)
316        except (CredCreationException, OpenSslWrapperException):
317            _error_message('cred create failed, please init first')
318            return
319
320        with open(self.file, 'w') as fp:
321            fp.write(cred)
322
323
324class CredVerificationException(Exception):
325    """raised when CredVerification execution failed"""
326    pass
327
328
329class CredVerification:
330    def __init__(self, store_dir: str, file: str):
331        self.ssl = OpenSslWrapper(store_dir)
332        self.file = file
333
334    def process(self):
335        try:
336            head, payload, signature, attestation = self._split_file(self.file)
337
338            self._check_head(head)
339            self._check_payload(payload)
340            self._check_signature(signature)
341            self._check_attestation(attestation, '{}.{}'.format(head, payload), signature)
342        except CredVerificationException as ex:
343            _error_message(ex)
344            return
345
346        _info_message('verify success!')
347
348    def _check_head(self, header: str):
349        header_str = self._base64decode(header).decode('utf8')
350        header_obj = ast.literal_eval(header_str)
351        if header_obj['typ'] != 'DSL':
352            raise CredVerificationException('head error')
353        _info_message('head:')
354        _info_message(json.dumps(header_obj, indent=2))
355
356    def _check_payload(self, payload: str):
357        payload_str = self._base64decode(payload).decode('utf8')
358        _info_message('payload:')
359        _info_message(json.dumps(json.loads(payload_str), indent=2))
360
361    def _check_signature(self, signature: str):
362        self._base64decode(signature)
363
364    def _check_attestation(self, attestation, payload, payload_sign):
365        ATTES_PARA_LEN = 3
366        attes_str = self._base64decode(attestation).decode('utf8')
367        attes_obj = json.loads(attes_str)
368        if (len(attes_obj) != ATTES_PARA_LEN):
369            raise CredVerificationException('attes para error')
370        ssl = self.ssl
371        device, oem, root = attes_obj
372
373        root_pk_bytes = self._base64decode(root[ATTESTATION_KEY_USERPUBLICKEY])
374        root_self_sign_bytes = self._base64decode(root[ATTESTATION_KEY_SIGNATURE])
375        verify = ssl.digest_verify_with_pub_key(root_pk_bytes, root_pk_bytes, root_self_sign_bytes, '-sha384')
376        if not verify:
377            raise CredVerificationException('root_self_sign verify error')
378
379        oem_pk_bytes = self._base64decode(oem[ATTESTATION_KEY_USERPUBLICKEY])
380        oem_sign_bytes = self._base64decode(oem[ATTESTATION_KEY_SIGNATURE])
381        verify = ssl.digest_verify_with_pub_key(root_pk_bytes, oem_pk_bytes, oem_sign_bytes, '-sha384')
382        if not verify:
383            raise CredVerificationException('oem_sign verify error')
384
385        device_pk_bytes = self._base64decode(device[ATTESTATION_KEY_USERPUBLICKEY])
386        device_sign_bytes = self._base64decode(device[ATTESTATION_KEY_SIGNATURE])
387        verify = ssl.digest_verify_with_pub_key(oem_pk_bytes, device_pk_bytes, device_sign_bytes, '-sha384')
388        if not verify:
389            raise CredVerificationException('device_sign verify error')
390
391        payload_sign_bytes = self._base64decode(payload_sign)
392        verify = ssl.digest_verify_with_pub_key(device_pk_bytes, payload.encode('utf8'),
393                                                payload_sign_bytes, '-sha384')
394        if not verify:
395            verify = ssl.digest_verify_with_pub_key(device_pk_bytes, payload.encode('utf8'),
396                                                    payload_sign_bytes, '-sha256')
397            if not verify:
398                raise CredVerificationException('payload verify error')
399
400    def _split_file(self, cred_file_name: str):
401        out = self._get_file_content(cred_file_name).split('.')
402        CRED_PARA_LEN = 4
403        if (len(out) != CRED_PARA_LEN):
404            raise CredVerificationException("cred para error")
405        return out
406
407    def _get_file_content(self, file_path: str):
408        if not os.path.isfile(file_path):
409            raise CredVerificationException('file {} is not existed'.format(file_path))
410
411        with open(file_path, 'r') as fp:
412            return fp.read().strip()
413
414    def _base64decode(self, content: str):
415        return base64.urlsafe_b64decode(content + '=' * (4 - len(content) % 4))
416
417
418def init_cred(input_args: argparse.Namespace):
419    action = CredInitialization(input_args.dir)
420    action.process()
421
422
423def create_cred(input_args):
424    payload = {k: v for k, v in input_args.__dict__.items() if k not in ['dir', 'cred', 'process', 'strict'] and v}
425    if input_args.strict:
426        init_cred(input_args)
427    acton = CredCreation(input_args.dir, input_args.cred, payload)
428    acton.process()
429    if input_args.strict:
430        shutil.rmtree(input_args.dir)
431
432
433def verify_cred(input_args):
434    acton = CredVerification(input_args.dir, input_args.cred)
435    acton.process()
436
437
438class CredCommand:
439    def _setup_arguments(self, subparsers, config: dict):
440        action = config.get('action')
441        arguments = config.get('arguments')
442        parser = subparsers.add_parser(action.get('name'), help=action.get('help'))
443        parser.set_defaults(process=action.get('process'))
444        for item, arg in arguments.items():
445            parser.add_argument(*arg.get('name'), dest=item,
446                                metavar=arg.get('metavar'), help=arg.get('help'),
447                                required=arg.get('required'), type=arg.get('type'),
448                                choices=arg.get('choices'), default=arg.get('default'))
449
450    def _setup_init_cmd_parses(self, subparsers):
451        cmd_args_def = {
452            'action': {
453                'name': 'init',
454                'help': 'initialization tool for device security level credential',
455                'process': init_cred
456            },
457            'arguments': {
458                'dir': {
459                    'name': ['-d', '--artifacts-dir'],
460                    'metavar': 'dir',
461                    'type': str,
462                    'default': 'artifacts',
463                    'help': 'output artifacts dir',
464                }
465            }
466        }
467        self._setup_arguments(subparsers, cmd_args_def)
468
469    def _setup_create_cmd_parses(self, subparsers):
470        cmd_args_def = {
471            'action': {
472                'name': 'create',
473                'help': 'creation tool for device security level credential',
474                'process': create_cred,
475            },
476            'arguments': {
477                'dir': {
478                    'name': ['-d', '--artifacts-dir'],
479                    'metavar': 'dir',
480                    'type': str,
481                    'default': 'artifacts',
482                    'help': 'input artifacts dir',
483                },
484                'type': {
485                    'name': ['-t', '--field-type'],
486                    'type': str,
487                    'choices': ['debug', 'release'],
488                    'default': 'debug',
489                    'help': 'debug or release',
490                },
491                'manufacture': {
492                    'name': ['-M', '--field-manufacture'],
493                    'type': str,
494                    'help': 'device manufacture info',
495                    'required': True
496                },
497                'brand': {
498                    'name': ['-b', '--field-brand'],
499                    'type': str,
500                    'help': 'device brand info',
501                    'required': True
502                },
503                'model': {
504                    'name': ['-m', '--field-model'],
505                    'type': str,
506                    'help': 'device model info',
507                    'required': True
508                },
509                'udid': {
510                    'name': ['-u', '--field-udid'],
511                    'type': str,
512                    'help': 'device udid info',
513                    'required': False
514                },
515                'sn': {
516                    'name': ['-n', '--field-sn'],
517                    'type': str,
518                    'help': 'device sn info',
519                    'required': False
520                },
521                'softwareVersion': {
522                    'name': ['-s', '--field-software-version'],
523                    'type': str,
524                    'help': 'device software version info',
525                    'required': True
526                },
527                'securityLevel': {
528                    'name': ['-l', '--field-security-level'],
529                    'type': str,
530                    'choices': ['SL1', 'SL2', 'SL3', 'SL4', 'SL5'],
531                    'default': 'SL1',
532                    'help': 'device security security info',
533                },
534                'cred': {
535                    'name': ['-f', '--cred-file'],
536                    'metavar': 'file',
537                    'type': str,
538                    'help': 'the device security level credential file to output',
539                    'required': True
540                },
541                'strict': {
542                    'name': ['--strict'],
543                    'metavar': 'strict',
544                    'type': bool,
545                    'choices': [True, False],
546                    'default': False,
547                    'help': 'clean up the artifacts after process',
548                },
549            }
550        }
551        self._setup_arguments(subparsers, cmd_args_def)
552
553    def _setup_verify_cmd_parses(self, subparsers):
554        cmd_args_def = {
555            'action': {
556                'name': 'verify',
557                'help': 'verification tool for device security level credential',
558                'process': verify_cred
559            },
560            'arguments': {
561                'cred': {
562                    'name': ['-f', '--cred-file'],
563                    'metavar': 'file',
564                    'type': str,
565                    'help': 'the device security level credential file to verify',
566                    'required': True
567                },
568                'dir': {
569                    'name': ['-d', '--artifacts-dir'],
570                    'metavar': 'dir',
571                    'type': str,
572                    'default': 'artifacts',
573                    'help': 'input artifacts dir',
574                }
575            }}
576        self._setup_arguments(subparsers, cmd_args_def)
577
578    def parse_args(self):
579        parser = argparse.ArgumentParser(description='A collection of device security level credential tools')
580        subparsers = parser.add_subparsers(required=True, metavar='action')
581        self._setup_init_cmd_parses(subparsers)
582        self._setup_create_cmd_parses(subparsers)
583        self._setup_verify_cmd_parses(subparsers)
584
585        return parser.parse_args()
586
587
588if __name__ == '__main__':
589    cmd = CredCommand()
590    args = cmd.parse_args()
591    args.process(args)
592