• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright (c) 2022 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import argparse
19import ast
20import base64
21import errno
22import json
23import os
24import sys
25import shutil
26import subprocess
27import tempfile
28
29from datetime import datetime
30
31CRED_VERSION = '1.0.1'
32ATTESTATION_KEY_USERPUBLICKEY = 'userPublicKey'
33ATTESTATION_KEY_SIGNATURE = 'signature'
34
35
36def _message(message, file):
37    if message and file:
38        file.write(str(message) + os.linesep)
39
40
41def _error_message(message, file=sys.stderr):
42    _message(message, file)
43
44
45def _info_message(message, file=sys.stdout):
46    _message(message, file)
47
48
49def run_command(command, input_data=None):
50    """
51    input:
52        command: str tuple/list of command and options to compose full cmd
53        input_data: bytes or None that will be redirect to command
54    output:
55        recode: int, non-zero means failure
56        out: bytes or None, stdout info
57        err: bytes or None, stderr info
58    """
59    TIMEOUT_SECONDS = 30
60    try:
61        proc = subprocess.Popen(
62            command,
63            stdin=None if input_data is None else subprocess.PIPE,
64            stdout=subprocess.PIPE,
65            stderr=subprocess.PIPE
66        )
67    except FileNotFoundError:
68        return errno.ENOENT, b'', b'executable not found'
69
70    try:
71        if input_data is None:
72            proc.wait(timeout=TIMEOUT_SECONDS)
73            out = proc.stdout.read()
74            err = proc.stderr.read()
75        else:
76            out, err = proc.communicate(
77                input=input_data, timeout=TIMEOUT_SECONDS)
78    except subprocess.TimeoutExpired:
79        proc.kill()
80        return errno.ETIMEDOUT, b'', b'timeout'
81
82    return proc.returncode, out, err
83
84
85class OpenSslWrapperException(Exception):
86    """raised when openssl execution failed"""
87    pass
88
89
90class OpenSslWrapper:
91    """wrapper for openssl:"""
92
93    PEM_PERFIX = '-----BEGIN PUBLIC KEY-----'
94    PEM_SUFFIX = '-----END PUBLIC KEY-----'
95
96    def __init__(self, store_dir: str = 'artifacts'):
97        self.exe = shutil.which('openssl')
98        if self.exe is None or not os.path.exists(self.exe):
99            raise OpenSslWrapperException('openssl binary not found')
100        store_dir = os.path.join(os.getcwd(), store_dir)
101        if not os.path.exists(store_dir):
102            os.makedirs(store_dir, 0o700)
103        self.store_dir = store_dir
104
105    def generate_ecc_key_pair(self, key_alias: str, curve: str = 'brainpoolP384r1') -> bool:
106        sk_file = os.path.join(self.store_dir, '{}.key.pem'.format(key_alias))
107        pk_file = os.path.join(self.store_dir, '{}.pub.pem'.format(key_alias))
108
109        try:
110            if not os.path.isfile(sk_file):
111                code, res, err = run_command([self.exe, 'ecparam', '-genkey', '-name', curve, '-out', sk_file])
112                if code != 0:
113                    raise OpenSslWrapperException('ecparam failed, err: {}'.format(err.decode('utf8')))
114
115            if not os.path.isfile(pk_file):
116                code, res, err = run_command(
117                    [self.exe, 'ec', '-in', sk_file, '-pubout', '-out', pk_file])
118                if code != 0:
119                    raise OpenSslWrapperException(
120                        'ec failed, err: {}'.format(err.decode('utf8')))
121        except OpenSslWrapperException:
122            if os.path.exists(sk_file):
123                os.remove(sk_file)
124            if os.path.exists(pk_file):
125                os.remove(pk_file)
126            return False
127        return True
128
129    def get_ecc_public_key(self, key_alias: str) -> bytes:
130        sk_file = os.path.join(self.store_dir, '{}.key.pem'.format(key_alias))
131        pk_file = os.path.join(self.store_dir, '{}.pub.pem'.format(key_alias))
132
133        try:
134            if not os.path.isfile(pk_file):
135                code, res, err = run_command([self.exe, 'ec', '-in', sk_file, '-pubout', '-out', pk_file])
136                if code != 0:
137                    raise OpenSslWrapperException('ec failed, err: {}'.format(err.decode('utf8')))
138            with open(pk_file) as fp:
139                pem_str = fp.read().replace(OpenSslWrapper.PEM_PERFIX, '')
140                pem_str = pem_str.replace(OpenSslWrapper.PEM_SUFFIX, '')
141                pem_str = pem_str.replace(os.linesep, '')
142                return base64.b64decode(pem_str)
143
144        except OpenSslWrapperException as ex:
145            if os.path.exists(sk_file):
146                os.remove(sk_file)
147            if os.path.exists(pk_file):
148                os.remove(pk_file)
149            raise ex
150
151    def digest_sign(self, key_alias: str, content: bytes, digest: str = '-sha384') -> bytes:
152        sk_file = os.path.join(self.store_dir, '{}.key.pem'.format(key_alias))
153
154        if not os.path.isfile(sk_file):
155            raise OpenSslWrapperException('path not exists')
156
157        code, res, err = run_command(
158            [self.exe, 'dgst', digest, '-sign', sk_file, ], content
159        )
160        if code != 0:
161            raise OpenSslWrapperException('dgst failed, err: {}'.format(err.decode('utf8')))
162
163        return res
164
165    def digest_verify_with_key_alias(self, key_alias: str, content: bytes, sig: bytes, digest: str = '-sha384') -> bool:
166        pk_file = os.path.join(self.store_dir, '{}.pub.pem'.format(key_alias))
167
168        if not os.path.isfile(pk_file):
169            raise OpenSslWrapperException('path not exists')
170        try:
171            with tempfile.NamedTemporaryFile('wb', dir=self.store_dir, delete=False) as signature_file:
172                signature_file.write(sig)
173                signature_file.close()
174
175            code, res, err = run_command(
176                [self.exe, 'dgst', digest, '-verify', pk_file, '-signature', signature_file.name, ], content
177            )
178        finally:
179            os.remove(signature_file.name)
180        return True if code == 0 else False
181
182    def digest_verify_with_pub_key(self, pub_key: bytes, content: bytes, sig: bytes, digest: str = '-sha384') -> bool:
183        try:
184            with tempfile.NamedTemporaryFile('wb+', dir=self.store_dir, delete=False) as pubkey_file:
185                # create an temp pubkey pem file
186                pub_key_base64 = base64.b64encode(pub_key).decode('utf8')
187
188                pub_key = '{}{}{}{}{}{}'.format(
189                    OpenSslWrapper.PEM_PERFIX, os.linesep,
190                    pub_key_base64, os.linesep,
191                    OpenSslWrapper.PEM_SUFFIX, os.linesep
192                )
193                pubkey_file.write(pub_key.encode('utf8'))
194                pubkey_file.close()
195
196            with tempfile.NamedTemporaryFile('wb', dir=self.store_dir, delete=False) as signature_file:
197                # create an temp signature file
198                signature_file.write(sig)
199                signature_file.close()
200
201            code, res, err = run_command(
202                [self.exe, 'dgst', digest, '-verify', pubkey_file.name, '-signature', signature_file.name, ], content
203            )
204        finally:
205            os.remove(pubkey_file.name)
206            os.remove(signature_file.name)
207
208        return True if code == 0 else False
209
210
211class CredInitializationException(Exception):
212    """raised when CredInitialization execution failed"""
213    pass
214
215
216class CredInitialization:
217    KEY_ALIAS_ROOT = 'root'
218    KEY_ALIAS_OEM = 'oem'
219    KEY_ALIAS_DEVICE = 'device'
220
221    def __init__(self, store_dir: str):
222        self.ssl = OpenSslWrapper(store_dir)
223
224    def process(self):
225        ssl = self.ssl
226        try:
227            success = ssl.generate_ecc_key_pair(CredInitialization.KEY_ALIAS_ROOT)
228            if not success:
229                raise CredInitializationException('generate_ecc_key_pair root error')
230            success = ssl.generate_ecc_key_pair(CredInitialization.KEY_ALIAS_OEM)
231            if not success:
232                raise CredInitializationException('generate_ecc_key_pair oem error')
233            success = ssl.generate_ecc_key_pair(CredInitialization.KEY_ALIAS_DEVICE)
234            if not success:
235                raise CredInitializationException('generate_ecc_key_pair device error')
236
237        except CredInitializationException:
238            _error_message('cred init failed')
239            return
240
241
242class CredCreationException(Exception):
243    """raised when CredInitialization execution failed"""
244    pass
245
246
247class CredCreation:
248    def __init__(self, store_dir: str, file: str, payload: dict):
249        self.ssl = OpenSslWrapper(store_dir)
250        self.payload = payload
251        self.file = file
252
253    def _gene_head(self):
254        head = {"typ": "DSL"}
255        return base64.b64encode(json.dumps(head, ensure_ascii=True).encode('utf8')).decode('utf8')
256
257    def _gene_attestation(self, root_pk: str, root_sign: str,
258                          oem_pk: str, oem_sign: str,
259                          device_pk: str, device_sign: str):
260        data = [
261            {
262                ATTESTATION_KEY_USERPUBLICKEY: device_pk,
263                ATTESTATION_KEY_SIGNATURE: device_sign
264            },
265            {
266                ATTESTATION_KEY_USERPUBLICKEY: oem_pk,
267                ATTESTATION_KEY_SIGNATURE: oem_sign
268            },
269            {
270                ATTESTATION_KEY_USERPUBLICKEY: root_pk,
271                ATTESTATION_KEY_SIGNATURE: root_sign
272            },
273        ]
274        return base64.b64encode(json.dumps(data, ensure_ascii=True).encode('utf8')).decode('utf8')
275
276    def _gene_payload(self):
277        # add sign time
278
279        self.payload['signTime'] = datetime.now().strftime('%Y%m%d%H%M%S')
280        self.payload['version'] = CRED_VERSION
281        return base64.b64encode(json.dumps(self.payload, ensure_ascii=True).encode('utf8')).decode('utf8')
282
283    def process(self):
284        ssl = self.ssl
285        try:
286            # root self signed
287            root_pub_bytes = ssl.get_ecc_public_key(CredInitialization.KEY_ALIAS_ROOT)
288            root_pub_self_signed_bytes = ssl.digest_sign(CredInitialization.KEY_ALIAS_ROOT, root_pub_bytes)
289            root_pub_str = base64.b64encode(root_pub_bytes).decode('utf8')
290            root_pub_self_sign_str = base64.b64encode(root_pub_self_signed_bytes).decode('utf8')
291
292            # oem signed by root
293            oem_pub_bytes = ssl.get_ecc_public_key(CredInitialization.KEY_ALIAS_OEM)
294            oem_pub_signed_bytes = ssl.digest_sign(CredInitialization.KEY_ALIAS_ROOT, oem_pub_bytes)
295            oem_pub_signed_str = base64.b64encode(oem_pub_signed_bytes).decode('utf8')
296            oem_pub_str = base64.b64encode(oem_pub_bytes).decode('utf8')
297
298            # device signed by oem
299            device_pub_bytes = ssl.get_ecc_public_key(CredInitialization.KEY_ALIAS_DEVICE)
300            device_pub_signed_bytes = ssl.digest_sign(CredInitialization.KEY_ALIAS_OEM, device_pub_bytes)
301            device_pub_signed_str = base64.b64encode(device_pub_signed_bytes).decode('utf8')
302            device_pub_str = base64.b64encode(device_pub_bytes).decode('utf8')
303
304            attestation = self._gene_attestation(root_pub_str, root_pub_self_sign_str,
305                                                 oem_pub_str, oem_pub_signed_str,
306                                                 device_pub_str, device_pub_signed_str)
307
308            head = self._gene_head()
309            payload = self._gene_payload()
310            head_payload = '{}.{}'.format(head, payload)
311
312            head_payload_signed_bytes = ssl.digest_sign(
313                CredInitialization.KEY_ALIAS_DEVICE, head_payload.encode('utf8'))
314            head_payload_signed_string = base64.b64encode(head_payload_signed_bytes).decode('utf8')
315            cred = '{}.{}.{}'.format(head_payload, head_payload_signed_string, attestation)
316        except (CredCreationException, OpenSslWrapperException):
317            _error_message('cred create failed, please init first')
318            return
319
320        fo = os.open(self.file, os.O_RDWR | os.O_CREAT, 0o640)
321        with os.fdopen(fo, "w+") as fd:
322            fd.write(cred)
323            fd.flush()
324
325
326class CredVerificationException(Exception):
327    """raised when CredVerification execution failed"""
328    pass
329
330
331class CredVerification:
332    def __init__(self, store_dir: str, file: str):
333        self.ssl = OpenSslWrapper(store_dir)
334        self.file = file
335
336    def process(self):
337        try:
338            head, payload, signature, attestation = self._split_file(self.file)
339
340            self._check_head(head)
341            self._check_payload(payload)
342            self._check_signature(signature)
343            self._check_attestation(attestation, '{}.{}'.format(head, payload), signature)
344        except CredVerificationException as ex:
345            _error_message(ex)
346            return
347
348        _info_message('verify success!')
349
350    def _check_head(self, header: str):
351        header_str = self._base64decode(header).decode('utf8')
352        header_obj = ast.literal_eval(header_str)
353        if header_obj['typ'] != 'DSL':
354            raise CredVerificationException('head error')
355        _info_message('head:')
356        _info_message(json.dumps(header_obj, indent=2))
357
358    def _check_payload(self, payload: str):
359        payload_str = self._base64decode(payload).decode('utf8')
360        _info_message('payload:')
361        _info_message(json.dumps(json.loads(payload_str), indent=2))
362
363    def _check_signature(self, signature: str):
364        self._base64decode(signature)
365
366    def _check_attestation(self, attestation, payload, payload_sign):
367        ATTES_PARA_LEN = 3
368        attes_str = self._base64decode(attestation).decode('utf8')
369        attes_obj = json.loads(attes_str)
370        if (len(attes_obj) != ATTES_PARA_LEN):
371            raise CredVerificationException('attes para error')
372        ssl = self.ssl
373        device, oem, root = attes_obj
374
375        root_pk_bytes = self._base64decode(root[ATTESTATION_KEY_USERPUBLICKEY])
376        root_self_sign_bytes = self._base64decode(root[ATTESTATION_KEY_SIGNATURE])
377        verify = ssl.digest_verify_with_pub_key(root_pk_bytes, root_pk_bytes, root_self_sign_bytes, '-sha384')
378        if not verify:
379            raise CredVerificationException('root_self_sign verify error')
380
381        oem_pk_bytes = self._base64decode(oem[ATTESTATION_KEY_USERPUBLICKEY])
382        oem_sign_bytes = self._base64decode(oem[ATTESTATION_KEY_SIGNATURE])
383        verify = ssl.digest_verify_with_pub_key(root_pk_bytes, oem_pk_bytes, oem_sign_bytes, '-sha384')
384        if not verify:
385            raise CredVerificationException('oem_sign verify error')
386
387        device_pk_bytes = self._base64decode(device[ATTESTATION_KEY_USERPUBLICKEY])
388        device_sign_bytes = self._base64decode(device[ATTESTATION_KEY_SIGNATURE])
389        verify = ssl.digest_verify_with_pub_key(oem_pk_bytes, device_pk_bytes, device_sign_bytes, '-sha384')
390        if not verify:
391            raise CredVerificationException('device_sign verify error')
392
393        payload_sign_bytes = self._base64decode(payload_sign)
394        verify = ssl.digest_verify_with_pub_key(device_pk_bytes, payload.encode('utf8'),
395                                                payload_sign_bytes, '-sha384')
396        if not verify:
397            verify = ssl.digest_verify_with_pub_key(device_pk_bytes, payload.encode('utf8'),
398                                                    payload_sign_bytes, '-sha256')
399            if not verify:
400                raise CredVerificationException('payload verify error')
401
402    def _split_file(self, cred_file_name: str):
403        out = self._get_file_content(cred_file_name).split('.')
404        CRED_PARA_LEN = 4
405        if (len(out) != CRED_PARA_LEN):
406            raise CredVerificationException("cred para error")
407        return out
408
409    def _get_file_content(self, file_path: str):
410        if not os.path.isfile(file_path):
411            raise CredVerificationException('file {} is not existed'.format(file_path))
412
413        with open(file_path, 'r') as fp:
414            return fp.read().strip()
415
416    def _base64decode(self, content: str):
417        return base64.urlsafe_b64decode(content + '=' * (4 - len(content) % 4))
418
419
420def init_cred(input_args: argparse.Namespace):
421    action = CredInitialization(input_args.dir)
422    action.process()
423
424
425def create_cred(input_args):
426    payload = {k: v for k, v in input_args.__dict__.items() if k not in ['dir', 'cred', 'process', 'strict'] and v}
427    if input_args.strict:
428        init_cred(input_args)
429    acton = CredCreation(input_args.dir, input_args.cred, payload)
430    acton.process()
431    if input_args.strict:
432        shutil.rmtree(input_args.dir)
433
434
435def verify_cred(input_args):
436    acton = CredVerification(input_args.dir, input_args.cred)
437    acton.process()
438
439
440class CredCommand:
441    def _setup_arguments(self, subparsers, config: dict):
442        action = config.get('action')
443        arguments = config.get('arguments')
444        parser = subparsers.add_parser(action.get('name'), help=action.get('help'))
445        parser.set_defaults(process=action.get('process'))
446        for item, arg in arguments.items():
447            parser.add_argument(*arg.get('name'), dest=item,
448                                metavar=arg.get('metavar'), help=arg.get('help'),
449                                required=arg.get('required'), type=arg.get('type'),
450                                choices=arg.get('choices'), default=arg.get('default'))
451
452    def _setup_init_cmd_parses(self, subparsers):
453        cmd_args_def = {
454            'action': {
455                'name': 'init',
456                'help': 'initialization tool for device security level credential',
457                'process': init_cred
458            },
459            'arguments': {
460                'dir': {
461                    'name': ['-d', '--artifacts-dir'],
462                    'metavar': 'dir',
463                    'type': str,
464                    'default': 'artifacts',
465                    'help': 'output artifacts dir',
466                }
467            }
468        }
469        self._setup_arguments(subparsers, cmd_args_def)
470
471    def _setup_create_cmd_parses(self, subparsers):
472        cmd_args_def = {
473            'action': {
474                'name': 'create',
475                'help': 'creation tool for device security level credential',
476                'process': create_cred,
477            },
478            'arguments': {
479                'dir': {
480                    'name': ['-d', '--artifacts-dir'],
481                    'metavar': 'dir',
482                    'type': str,
483                    'default': 'artifacts',
484                    'help': 'input artifacts dir',
485                },
486                'type': {
487                    'name': ['-t', '--field-type'],
488                    'type': str,
489                    'choices': ['debug', 'release'],
490                    'default': 'debug',
491                    'help': 'debug or release',
492                },
493                'manufacture': {
494                    'name': ['-M', '--field-manufacture'],
495                    'type': str,
496                    'help': 'device manufacture info',
497                    'required': True
498                },
499                'brand': {
500                    'name': ['-b', '--field-brand'],
501                    'type': str,
502                    'help': 'device brand info',
503                    'required': True
504                },
505                'model': {
506                    'name': ['-m', '--field-model'],
507                    'type': str,
508                    'help': 'device model info',
509                    'required': True
510                },
511                'udid': {
512                    'name': ['-u', '--field-udid'],
513                    'type': str,
514                    'help': 'device udid info',
515                    'required': False
516                },
517                'sn': {
518                    'name': ['-n', '--field-sn'],
519                    'type': str,
520                    'help': 'device sn info',
521                    'required': False
522                },
523                'softwareVersion': {
524                    'name': ['-s', '--field-software-version'],
525                    'type': str,
526                    'help': 'device software version info',
527                    'required': True
528                },
529                'securityLevel': {
530                    'name': ['-l', '--field-security-level'],
531                    'type': str,
532                    'choices': ['SL1', 'SL2', 'SL3', 'SL4', 'SL5'],
533                    'default': 'SL1',
534                    'help': 'device security security info',
535                },
536                'cred': {
537                    'name': ['-f', '--cred-file'],
538                    'metavar': 'file',
539                    'type': str,
540                    'help': 'the device security level credential file to output',
541                    'required': True
542                },
543                'strict': {
544                    'name': ['--strict'],
545                    'metavar': 'strict',
546                    'type': bool,
547                    'choices': [True, False],
548                    'default': False,
549                    'help': 'clean up the artifacts after process',
550                },
551            }
552        }
553        self._setup_arguments(subparsers, cmd_args_def)
554
555    def _setup_verify_cmd_parses(self, subparsers):
556        cmd_args_def = {
557            'action': {
558                'name': 'verify',
559                'help': 'verification tool for device security level credential',
560                'process': verify_cred
561            },
562            'arguments': {
563                'cred': {
564                    'name': ['-f', '--cred-file'],
565                    'metavar': 'file',
566                    'type': str,
567                    'help': 'the device security level credential file to verify',
568                    'required': True
569                },
570                'dir': {
571                    'name': ['-d', '--artifacts-dir'],
572                    'metavar': 'dir',
573                    'type': str,
574                    'default': 'artifacts',
575                    'help': 'input artifacts dir',
576                }
577            }}
578        self._setup_arguments(subparsers, cmd_args_def)
579
580    def parse_args(self):
581        parser = argparse.ArgumentParser(description='A collection of device security level credential tools')
582        subparsers = parser.add_subparsers(required=True, metavar='action')
583        self._setup_init_cmd_parses(subparsers)
584        self._setup_create_cmd_parses(subparsers)
585        self._setup_verify_cmd_parses(subparsers)
586
587        return parser.parse_args()
588
589
590if __name__ == '__main__':
591    cmd = CredCommand()
592    args = cmd.parse_args()
593    args.process(args)
594