1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Copyright (c) 2022 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18import argparse 19import ast 20import base64 21import errno 22import json 23import os 24import sys 25import shutil 26import subprocess 27import tempfile 28 29from datetime import datetime 30 31CRED_VERSION = '1.0.1' 32ATTESTATION_KEY_USERPUBLICKEY = 'userPublicKey' 33ATTESTATION_KEY_SIGNATURE = 'signature' 34 35 36def _message(message, file): 37 if message and file: 38 file.write(str(message) + os.linesep) 39 40 41def _error_message(message, file=sys.stderr): 42 _message(message, file) 43 44 45def _info_message(message, file=sys.stdout): 46 _message(message, file) 47 48 49def run_command(command, input_data=None): 50 """ 51 input: 52 command: str tuple/list of command and options to compose full cmd 53 input_data: bytes or None that will be redirect to command 54 output: 55 recode: int, non-zero means failure 56 out: bytes or None, stdout info 57 err: bytes or None, stderr info 58 """ 59 TIMEOUT_SECONDS = 30 60 try: 61 proc = subprocess.Popen( 62 command, 63 stdin=None if input_data is None else subprocess.PIPE, 64 stdout=subprocess.PIPE, 65 stderr=subprocess.PIPE 66 ) 67 except FileNotFoundError: 68 return errno.ENOENT, b'', b'executable not found' 69 70 try: 71 if input_data is None: 72 proc.wait(timeout=TIMEOUT_SECONDS) 73 out = proc.stdout.read() 74 err = proc.stderr.read() 75 else: 76 out, err = proc.communicate( 77 input=input_data, timeout=TIMEOUT_SECONDS) 78 except subprocess.TimeoutExpired: 79 proc.kill() 80 return errno.ETIMEDOUT, b'', b'timeout' 81 82 return proc.returncode, out, err 83 84 85class OpenSslWrapperException(Exception): 86 """raised when openssl execution failed""" 87 pass 88 89 90class OpenSslWrapper: 91 """wrapper for openssl:""" 92 93 PEM_PERFIX = '-----BEGIN PUBLIC KEY-----' 94 PEM_SUFFIX = '-----END PUBLIC KEY-----' 95 96 def __init__(self, store_dir: str = 'artifacts'): 97 self.exe = shutil.which('openssl') 98 if self.exe is None or not os.path.exists(self.exe): 99 raise OpenSslWrapperException('openssl binary not found') 100 store_dir = os.path.join(os.getcwd(), store_dir) 101 if not os.path.exists(store_dir): 102 os.makedirs(store_dir, 0o700) 103 self.store_dir = store_dir 104 105 def generate_ecc_key_pair(self, key_alias: str, curve: str = 'brainpoolP384r1') -> bool: 106 sk_file = os.path.join(self.store_dir, '{}.key.pem'.format(key_alias)) 107 pk_file = os.path.join(self.store_dir, '{}.pub.pem'.format(key_alias)) 108 109 try: 110 if not os.path.isfile(sk_file): 111 code, res, err = run_command([self.exe, 'ecparam', '-genkey', '-name', curve, '-out', sk_file]) 112 if code != 0: 113 raise OpenSslWrapperException('ecparam failed, err: {}'.format(err.decode('utf8'))) 114 115 if not os.path.isfile(pk_file): 116 code, res, err = run_command( 117 [self.exe, 'ec', '-in', sk_file, '-pubout', '-out', pk_file]) 118 if code != 0: 119 raise OpenSslWrapperException( 120 'ec failed, err: {}'.format(err.decode('utf8'))) 121 except OpenSslWrapperException: 122 if os.path.exists(sk_file): 123 os.remove(sk_file) 124 if os.path.exists(pk_file): 125 os.remove(pk_file) 126 return False 127 return True 128 129 def get_ecc_public_key(self, key_alias: str) -> bytes: 130 sk_file = os.path.join(self.store_dir, '{}.key.pem'.format(key_alias)) 131 pk_file = os.path.join(self.store_dir, '{}.pub.pem'.format(key_alias)) 132 133 try: 134 if not os.path.isfile(pk_file): 135 code, res, err = run_command([self.exe, 'ec', '-in', sk_file, '-pubout', '-out', pk_file]) 136 if code != 0: 137 raise OpenSslWrapperException('ec failed, err: {}'.format(err.decode('utf8'))) 138 with open(pk_file) as fp: 139 pem_str = fp.read().replace(OpenSslWrapper.PEM_PERFIX, '') 140 pem_str = pem_str.replace(OpenSslWrapper.PEM_SUFFIX, '') 141 pem_str = pem_str.replace(os.linesep, '') 142 return base64.b64decode(pem_str) 143 144 except OpenSslWrapperException as ex: 145 if os.path.exists(sk_file): 146 os.remove(sk_file) 147 if os.path.exists(pk_file): 148 os.remove(pk_file) 149 raise ex 150 151 def digest_sign(self, key_alias: str, content: bytes, digest: str = '-sha384') -> bytes: 152 sk_file = os.path.join(self.store_dir, '{}.key.pem'.format(key_alias)) 153 154 if not os.path.isfile(sk_file): 155 raise OpenSslWrapperException('path not exists') 156 157 code, res, err = run_command( 158 [self.exe, 'dgst', digest, '-sign', sk_file, ], content 159 ) 160 if code != 0: 161 raise OpenSslWrapperException('dgst failed, err: {}'.format(err.decode('utf8'))) 162 163 return res 164 165 def digest_verify_with_key_alias(self, key_alias: str, content: bytes, sig: bytes, digest: str = '-sha384') -> bool: 166 pk_file = os.path.join(self.store_dir, '{}.pub.pem'.format(key_alias)) 167 168 if not os.path.isfile(pk_file): 169 raise OpenSslWrapperException('path not exists') 170 try: 171 with tempfile.NamedTemporaryFile('wb', dir=self.store_dir, delete=False) as signature_file: 172 signature_file.write(sig) 173 signature_file.close() 174 175 code, res, err = run_command( 176 [self.exe, 'dgst', digest, '-verify', pk_file, '-signature', signature_file.name, ], content 177 ) 178 finally: 179 os.remove(signature_file.name) 180 return True if code == 0 else False 181 182 def digest_verify_with_pub_key(self, pub_key: bytes, content: bytes, sig: bytes, digest: str = '-sha384') -> bool: 183 try: 184 with tempfile.NamedTemporaryFile('wb+', dir=self.store_dir, delete=False) as pubkey_file: 185 # create an temp pubkey pem file 186 pub_key_base64 = base64.b64encode(pub_key).decode('utf8') 187 188 pub_key = '{}{}{}{}{}{}'.format( 189 OpenSslWrapper.PEM_PERFIX, os.linesep, 190 pub_key_base64, os.linesep, 191 OpenSslWrapper.PEM_SUFFIX, os.linesep 192 ) 193 pubkey_file.write(pub_key.encode('utf8')) 194 pubkey_file.close() 195 196 with tempfile.NamedTemporaryFile('wb', dir=self.store_dir, delete=False) as signature_file: 197 # create an temp signature file 198 signature_file.write(sig) 199 signature_file.close() 200 201 code, res, err = run_command( 202 [self.exe, 'dgst', digest, '-verify', pubkey_file.name, '-signature', signature_file.name, ], content 203 ) 204 finally: 205 os.remove(pubkey_file.name) 206 os.remove(signature_file.name) 207 208 return True if code == 0 else False 209 210 211class CredInitializationException(Exception): 212 """raised when CredInitialization execution failed""" 213 pass 214 215 216class CredInitialization: 217 KEY_ALIAS_ROOT = 'root' 218 KEY_ALIAS_OEM = 'oem' 219 KEY_ALIAS_DEVICE = 'device' 220 221 def __init__(self, store_dir: str): 222 self.ssl = OpenSslWrapper(store_dir) 223 224 def process(self): 225 ssl = self.ssl 226 try: 227 success = ssl.generate_ecc_key_pair(CredInitialization.KEY_ALIAS_ROOT) 228 if not success: 229 raise CredInitializationException('generate_ecc_key_pair root error') 230 success = ssl.generate_ecc_key_pair(CredInitialization.KEY_ALIAS_OEM) 231 if not success: 232 raise CredInitializationException('generate_ecc_key_pair oem error') 233 success = ssl.generate_ecc_key_pair(CredInitialization.KEY_ALIAS_DEVICE) 234 if not success: 235 raise CredInitializationException('generate_ecc_key_pair device error') 236 237 except CredInitializationException: 238 _error_message('cred init failed') 239 return 240 241 242class CredCreationException(Exception): 243 """raised when CredInitialization execution failed""" 244 pass 245 246 247class CredCreation: 248 def __init__(self, store_dir: str, file: str, payload: dict): 249 self.ssl = OpenSslWrapper(store_dir) 250 self.payload = payload 251 self.file = file 252 253 def _gene_head(self): 254 head = {"typ": "DSL"} 255 return base64.b64encode(json.dumps(head, ensure_ascii=True).encode('utf8')).decode('utf8') 256 257 def _gene_attestation(self, root_pk: str, root_sign: str, 258 oem_pk: str, oem_sign: str, 259 device_pk: str, device_sign: str): 260 data = [ 261 { 262 ATTESTATION_KEY_USERPUBLICKEY: device_pk, 263 ATTESTATION_KEY_SIGNATURE: device_sign 264 }, 265 { 266 ATTESTATION_KEY_USERPUBLICKEY: oem_pk, 267 ATTESTATION_KEY_SIGNATURE: oem_sign 268 }, 269 { 270 ATTESTATION_KEY_USERPUBLICKEY: root_pk, 271 ATTESTATION_KEY_SIGNATURE: root_sign 272 }, 273 ] 274 return base64.b64encode(json.dumps(data, ensure_ascii=True).encode('utf8')).decode('utf8') 275 276 def _gene_payload(self): 277 # add sign time 278 279 self.payload['signTime'] = datetime.now().strftime('%Y%m%d%H%M%S') 280 self.payload['version'] = CRED_VERSION 281 return base64.b64encode(json.dumps(self.payload, ensure_ascii=True).encode('utf8')).decode('utf8') 282 283 def process(self): 284 ssl = self.ssl 285 try: 286 # root self signed 287 root_pub_bytes = ssl.get_ecc_public_key(CredInitialization.KEY_ALIAS_ROOT) 288 root_pub_self_signed_bytes = ssl.digest_sign(CredInitialization.KEY_ALIAS_ROOT, root_pub_bytes) 289 root_pub_str = base64.b64encode(root_pub_bytes).decode('utf8') 290 root_pub_self_sign_str = base64.b64encode(root_pub_self_signed_bytes).decode('utf8') 291 292 # oem signed by root 293 oem_pub_bytes = ssl.get_ecc_public_key(CredInitialization.KEY_ALIAS_OEM) 294 oem_pub_signed_bytes = ssl.digest_sign(CredInitialization.KEY_ALIAS_ROOT, oem_pub_bytes) 295 oem_pub_signed_str = base64.b64encode(oem_pub_signed_bytes).decode('utf8') 296 oem_pub_str = base64.b64encode(oem_pub_bytes).decode('utf8') 297 298 # device signed by oem 299 device_pub_bytes = ssl.get_ecc_public_key(CredInitialization.KEY_ALIAS_DEVICE) 300 device_pub_signed_bytes = ssl.digest_sign(CredInitialization.KEY_ALIAS_OEM, device_pub_bytes) 301 device_pub_signed_str = base64.b64encode(device_pub_signed_bytes).decode('utf8') 302 device_pub_str = base64.b64encode(device_pub_bytes).decode('utf8') 303 304 attestation = self._gene_attestation(root_pub_str, root_pub_self_sign_str, 305 oem_pub_str, oem_pub_signed_str, 306 device_pub_str, device_pub_signed_str) 307 308 head = self._gene_head() 309 payload = self._gene_payload() 310 head_payload = '{}.{}'.format(head, payload) 311 312 head_payload_signed_bytes = ssl.digest_sign( 313 CredInitialization.KEY_ALIAS_DEVICE, head_payload.encode('utf8')) 314 head_payload_signed_string = base64.b64encode(head_payload_signed_bytes).decode('utf8') 315 cred = '{}.{}.{}'.format(head_payload, head_payload_signed_string, attestation) 316 except (CredCreationException, OpenSslWrapperException): 317 _error_message('cred create failed, please init first') 318 return 319 320 fo = os.open(self.file, os.O_RDWR | os.O_CREAT, 0o640) 321 with os.fdopen(fo, "w+") as fd: 322 fd.write(cred) 323 fd.flush() 324 325 326class CredVerificationException(Exception): 327 """raised when CredVerification execution failed""" 328 pass 329 330 331class CredVerification: 332 def __init__(self, store_dir: str, file: str): 333 self.ssl = OpenSslWrapper(store_dir) 334 self.file = file 335 336 def process(self): 337 try: 338 head, payload, signature, attestation = self._split_file(self.file) 339 340 self._check_head(head) 341 self._check_payload(payload) 342 self._check_signature(signature) 343 self._check_attestation(attestation, '{}.{}'.format(head, payload), signature) 344 except CredVerificationException as ex: 345 _error_message(ex) 346 return 347 348 _info_message('verify success!') 349 350 def _check_head(self, header: str): 351 header_str = self._base64decode(header).decode('utf8') 352 header_obj = ast.literal_eval(header_str) 353 if header_obj['typ'] != 'DSL': 354 raise CredVerificationException('head error') 355 _info_message('head:') 356 _info_message(json.dumps(header_obj, indent=2)) 357 358 def _check_payload(self, payload: str): 359 payload_str = self._base64decode(payload).decode('utf8') 360 _info_message('payload:') 361 _info_message(json.dumps(json.loads(payload_str), indent=2)) 362 363 def _check_signature(self, signature: str): 364 self._base64decode(signature) 365 366 def _check_attestation(self, attestation, payload, payload_sign): 367 ATTES_PARA_LEN = 3 368 attes_str = self._base64decode(attestation).decode('utf8') 369 attes_obj = json.loads(attes_str) 370 if (len(attes_obj) != ATTES_PARA_LEN): 371 raise CredVerificationException('attes para error') 372 ssl = self.ssl 373 device, oem, root = attes_obj 374 375 root_pk_bytes = self._base64decode(root[ATTESTATION_KEY_USERPUBLICKEY]) 376 root_self_sign_bytes = self._base64decode(root[ATTESTATION_KEY_SIGNATURE]) 377 verify = ssl.digest_verify_with_pub_key(root_pk_bytes, root_pk_bytes, root_self_sign_bytes, '-sha384') 378 if not verify: 379 raise CredVerificationException('root_self_sign verify error') 380 381 oem_pk_bytes = self._base64decode(oem[ATTESTATION_KEY_USERPUBLICKEY]) 382 oem_sign_bytes = self._base64decode(oem[ATTESTATION_KEY_SIGNATURE]) 383 verify = ssl.digest_verify_with_pub_key(root_pk_bytes, oem_pk_bytes, oem_sign_bytes, '-sha384') 384 if not verify: 385 raise CredVerificationException('oem_sign verify error') 386 387 device_pk_bytes = self._base64decode(device[ATTESTATION_KEY_USERPUBLICKEY]) 388 device_sign_bytes = self._base64decode(device[ATTESTATION_KEY_SIGNATURE]) 389 verify = ssl.digest_verify_with_pub_key(oem_pk_bytes, device_pk_bytes, device_sign_bytes, '-sha384') 390 if not verify: 391 raise CredVerificationException('device_sign verify error') 392 393 payload_sign_bytes = self._base64decode(payload_sign) 394 verify = ssl.digest_verify_with_pub_key(device_pk_bytes, payload.encode('utf8'), 395 payload_sign_bytes, '-sha384') 396 if not verify: 397 verify = ssl.digest_verify_with_pub_key(device_pk_bytes, payload.encode('utf8'), 398 payload_sign_bytes, '-sha256') 399 if not verify: 400 raise CredVerificationException('payload verify error') 401 402 def _split_file(self, cred_file_name: str): 403 out = self._get_file_content(cred_file_name).split('.') 404 CRED_PARA_LEN = 4 405 if (len(out) != CRED_PARA_LEN): 406 raise CredVerificationException("cred para error") 407 return out 408 409 def _get_file_content(self, file_path: str): 410 if not os.path.isfile(file_path): 411 raise CredVerificationException('file {} is not existed'.format(file_path)) 412 413 with open(file_path, 'r') as fp: 414 return fp.read().strip() 415 416 def _base64decode(self, content: str): 417 return base64.urlsafe_b64decode(content + '=' * (4 - len(content) % 4)) 418 419 420def init_cred(input_args: argparse.Namespace): 421 action = CredInitialization(input_args.dir) 422 action.process() 423 424 425def create_cred(input_args): 426 payload = {k: v for k, v in input_args.__dict__.items() if k not in ['dir', 'cred', 'process', 'strict'] and v} 427 if input_args.strict: 428 init_cred(input_args) 429 acton = CredCreation(input_args.dir, input_args.cred, payload) 430 acton.process() 431 if input_args.strict: 432 shutil.rmtree(input_args.dir) 433 434 435def verify_cred(input_args): 436 acton = CredVerification(input_args.dir, input_args.cred) 437 acton.process() 438 439 440class CredCommand: 441 def _setup_arguments(self, subparsers, config: dict): 442 action = config.get('action') 443 arguments = config.get('arguments') 444 parser = subparsers.add_parser(action.get('name'), help=action.get('help')) 445 parser.set_defaults(process=action.get('process')) 446 for item, arg in arguments.items(): 447 parser.add_argument(*arg.get('name'), dest=item, 448 metavar=arg.get('metavar'), help=arg.get('help'), 449 required=arg.get('required'), type=arg.get('type'), 450 choices=arg.get('choices'), default=arg.get('default')) 451 452 def _setup_init_cmd_parses(self, subparsers): 453 cmd_args_def = { 454 'action': { 455 'name': 'init', 456 'help': 'initialization tool for device security level credential', 457 'process': init_cred 458 }, 459 'arguments': { 460 'dir': { 461 'name': ['-d', '--artifacts-dir'], 462 'metavar': 'dir', 463 'type': str, 464 'default': 'artifacts', 465 'help': 'output artifacts dir', 466 } 467 } 468 } 469 self._setup_arguments(subparsers, cmd_args_def) 470 471 def _setup_create_cmd_parses(self, subparsers): 472 cmd_args_def = { 473 'action': { 474 'name': 'create', 475 'help': 'creation tool for device security level credential', 476 'process': create_cred, 477 }, 478 'arguments': { 479 'dir': { 480 'name': ['-d', '--artifacts-dir'], 481 'metavar': 'dir', 482 'type': str, 483 'default': 'artifacts', 484 'help': 'input artifacts dir', 485 }, 486 'type': { 487 'name': ['-t', '--field-type'], 488 'type': str, 489 'choices': ['debug', 'release'], 490 'default': 'debug', 491 'help': 'debug or release', 492 }, 493 'manufacture': { 494 'name': ['-M', '--field-manufacture'], 495 'type': str, 496 'help': 'device manufacture info', 497 'required': True 498 }, 499 'brand': { 500 'name': ['-b', '--field-brand'], 501 'type': str, 502 'help': 'device brand info', 503 'required': True 504 }, 505 'model': { 506 'name': ['-m', '--field-model'], 507 'type': str, 508 'help': 'device model info', 509 'required': True 510 }, 511 'udid': { 512 'name': ['-u', '--field-udid'], 513 'type': str, 514 'help': 'device udid info', 515 'required': False 516 }, 517 'sn': { 518 'name': ['-n', '--field-sn'], 519 'type': str, 520 'help': 'device sn info', 521 'required': False 522 }, 523 'softwareVersion': { 524 'name': ['-s', '--field-software-version'], 525 'type': str, 526 'help': 'device software version info', 527 'required': True 528 }, 529 'securityLevel': { 530 'name': ['-l', '--field-security-level'], 531 'type': str, 532 'choices': ['SL1', 'SL2', 'SL3', 'SL4', 'SL5'], 533 'default': 'SL1', 534 'help': 'device security security info', 535 }, 536 'cred': { 537 'name': ['-f', '--cred-file'], 538 'metavar': 'file', 539 'type': str, 540 'help': 'the device security level credential file to output', 541 'required': True 542 }, 543 'strict': { 544 'name': ['--strict'], 545 'metavar': 'strict', 546 'type': bool, 547 'choices': [True, False], 548 'default': False, 549 'help': 'clean up the artifacts after process', 550 }, 551 } 552 } 553 self._setup_arguments(subparsers, cmd_args_def) 554 555 def _setup_verify_cmd_parses(self, subparsers): 556 cmd_args_def = { 557 'action': { 558 'name': 'verify', 559 'help': 'verification tool for device security level credential', 560 'process': verify_cred 561 }, 562 'arguments': { 563 'cred': { 564 'name': ['-f', '--cred-file'], 565 'metavar': 'file', 566 'type': str, 567 'help': 'the device security level credential file to verify', 568 'required': True 569 }, 570 'dir': { 571 'name': ['-d', '--artifacts-dir'], 572 'metavar': 'dir', 573 'type': str, 574 'default': 'artifacts', 575 'help': 'input artifacts dir', 576 } 577 }} 578 self._setup_arguments(subparsers, cmd_args_def) 579 580 def parse_args(self): 581 parser = argparse.ArgumentParser(description='A collection of device security level credential tools') 582 subparsers = parser.add_subparsers(required=True, metavar='action') 583 self._setup_init_cmd_parses(subparsers) 584 self._setup_create_cmd_parses(subparsers) 585 self._setup_verify_cmd_parses(subparsers) 586 587 return parser.parse_args() 588 589 590if __name__ == '__main__': 591 cmd = CredCommand() 592 args = cmd.parse_args() 593 args.process(args) 594