1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Copyright (c) 2022 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18import argparse 19import ast 20import base64 21import errno 22import json 23import os 24import sys 25import shutil 26import subprocess 27import tempfile 28 29from datetime import datetime 30 31CRED_VERSION = '1.0.1' 32ATTESTATION_KEY_USERPUBLICKEY = 'userPublicKey' 33ATTESTATION_KEY_SIGNATURE = 'signature' 34 35 36def _message(message, file): 37 if message and file: 38 file.write(str(message) + os.linesep) 39 40 41def _error_message(message, file=sys.stderr): 42 _message(message, file) 43 44 45def _info_message(message, file=sys.stdout): 46 _message(message, file) 47 48 49def run_command(command, input_data=None): 50 """ 51 input: 52 command: str tuple/list of command and options to compose full cmd 53 input_data: bytes or None that will be redirect to command 54 output: 55 recode: int, non-zero means failure 56 out: bytes or None, stdout info 57 err: bytes or None, stderr info 58 """ 59 TIMEOUT_SECONDS = 30 60 try: 61 proc = subprocess.Popen( 62 command, 63 stdin=None if input_data is None else subprocess.PIPE, 64 stdout=subprocess.PIPE, 65 stderr=subprocess.PIPE 66 ) 67 except FileNotFoundError: 68 return errno.ENOENT, b'', b'executable not found' 69 70 try: 71 if input_data is None: 72 proc.wait(timeout=TIMEOUT_SECONDS) 73 out = proc.stdout.read() 74 err = proc.stderr.read() 75 else: 76 out, err = proc.communicate( 77 input=input_data, timeout=TIMEOUT_SECONDS) 78 except subprocess.TimeoutExpired: 79 proc.kill() 80 return errno.ETIMEDOUT, b'', b'timeout' 81 82 return proc.returncode, out, err 83 84 85class OpenSslWrapperException(Exception): 86 """raised when openssl execution failed""" 87 pass 88 89 90class OpenSslWrapper: 91 """wrapper for openssl:""" 92 93 PEM_PERFIX = '-----BEGIN PUBLIC KEY-----' 94 PEM_SUFFIX = '-----END PUBLIC KEY-----' 95 96 def __init__(self, store_dir: str = 'artifacts'): 97 self.exe = shutil.which('openssl') 98 if self.exe is None or not os.path.exists(self.exe): 99 raise OpenSslWrapperException('openssl binary not found') 100 store_dir = os.path.join(os.getcwd(), store_dir) 101 if not os.path.exists(store_dir): 102 os.makedirs(store_dir, 0o700) 103 self.store_dir = store_dir 104 105 def generate_ecc_key_pair(self, key_alias: str, curve: str = 'brainpoolP384r1') -> bool: 106 sk_file = os.path.join(self.store_dir, '{}.key.pem'.format(key_alias)) 107 pk_file = os.path.join(self.store_dir, '{}.pub.pem'.format(key_alias)) 108 109 try: 110 if not os.path.isfile(sk_file): 111 code, res, err = run_command([self.exe, 'ecparam', '-genkey', '-name', curve, '-out', sk_file]) 112 if code != 0: 113 raise OpenSslWrapperException('ecparam failed, err: {}'.format(err.decode('utf8'))) 114 115 if not os.path.isfile(pk_file): 116 code, res, err = run_command( 117 [self.exe, 'ec', '-in', sk_file, '-pubout', '-out', pk_file]) 118 if code != 0: 119 raise OpenSslWrapperException( 120 'ec failed, err: {}'.format(err.decode('utf8'))) 121 except OpenSslWrapperException: 122 if os.path.exists(sk_file): 123 os.remove(sk_file) 124 if os.path.exists(pk_file): 125 os.remove(pk_file) 126 return False 127 return True 128 129 def get_ecc_public_key(self, key_alias: str) -> bytes: 130 sk_file = os.path.join(self.store_dir, '{}.key.pem'.format(key_alias)) 131 pk_file = os.path.join(self.store_dir, '{}.pub.pem'.format(key_alias)) 132 133 try: 134 if not os.path.isfile(pk_file): 135 code, res, err = run_command([self.exe, 'ec', '-in', sk_file, '-pubout', '-out', pk_file]) 136 if code != 0: 137 raise OpenSslWrapperException('ec failed, err: {}'.format(err.decode('utf8'))) 138 with open(pk_file) as fp: 139 pem_str = fp.read().replace(OpenSslWrapper.PEM_PERFIX, '') 140 pem_str = pem_str.replace(OpenSslWrapper.PEM_SUFFIX, '') 141 pem_str = pem_str.replace(os.linesep, '') 142 return base64.b64decode(pem_str) 143 144 except OpenSslWrapperException as ex: 145 if os.path.exists(sk_file): 146 os.remove(sk_file) 147 if os.path.exists(pk_file): 148 os.remove(pk_file) 149 raise ex 150 151 def digest_sign(self, key_alias: str, content: bytes, digest: str = '-sha384') -> bytes: 152 sk_file = os.path.join(self.store_dir, '{}.key.pem'.format(key_alias)) 153 154 if not os.path.isfile(sk_file): 155 raise OpenSslWrapperException('path not exists') 156 157 code, res, err = run_command( 158 [self.exe, 'dgst', digest, '-sign', sk_file, ], content 159 ) 160 if code != 0: 161 raise OpenSslWrapperException('dgst failed, err: {}'.format(err.decode('utf8'))) 162 163 return res 164 165 def digest_verify_with_key_alias(self, key_alias: str, content: bytes, sig: bytes, digest: str = '-sha384') -> bool: 166 pk_file = os.path.join(self.store_dir, '{}.pub.pem'.format(key_alias)) 167 168 if not os.path.isfile(pk_file): 169 raise OpenSslWrapperException('path not exists') 170 try: 171 with tempfile.NamedTemporaryFile('wb', dir=self.store_dir, delete=False) as signature_file: 172 signature_file.write(sig) 173 signature_file.close() 174 175 code, res, err = run_command( 176 [self.exe, 'dgst', digest, '-verify', pk_file, '-signature', signature_file.name, ], content 177 ) 178 finally: 179 os.remove(signature_file.name) 180 return True if code == 0 else False 181 182 def digest_verify_with_pub_key(self, pub_key: bytes, content: bytes, sig: bytes, digest: str = '-sha384') -> bool: 183 try: 184 with tempfile.NamedTemporaryFile('wb+', dir=self.store_dir, delete=False) as pubkey_file: 185 # create an temp pubkey pem file 186 pub_key_base64 = base64.b64encode(pub_key).decode('utf8') 187 188 pub_key = '{}{}{}{}{}{}'.format( 189 OpenSslWrapper.PEM_PERFIX, os.linesep, 190 pub_key_base64, os.linesep, 191 OpenSslWrapper.PEM_SUFFIX, os.linesep 192 ) 193 pubkey_file.write(pub_key.encode('utf8')) 194 pubkey_file.close() 195 196 with tempfile.NamedTemporaryFile('wb', dir=self.store_dir, delete=False) as signature_file: 197 # create an temp signature file 198 signature_file.write(sig) 199 signature_file.close() 200 201 code, res, err = run_command( 202 [self.exe, 'dgst', digest, '-verify', pubkey_file.name, '-signature', signature_file.name, ], content 203 ) 204 finally: 205 os.remove(pubkey_file.name) 206 os.remove(signature_file.name) 207 208 return True if code == 0 else False 209 210 211class CredInitializationException(Exception): 212 """raised when CredInitialization execution failed""" 213 pass 214 215 216class CredInitialization: 217 KEY_ALIAS_ROOT = 'root' 218 KEY_ALIAS_OEM = 'oem' 219 KEY_ALIAS_DEVICE = 'device' 220 221 def __init__(self, store_dir: str): 222 self.ssl = OpenSslWrapper(store_dir) 223 224 def process(self): 225 ssl = self.ssl 226 try: 227 success = ssl.generate_ecc_key_pair(CredInitialization.KEY_ALIAS_ROOT) 228 if not success: 229 raise CredInitializationException('generate_ecc_key_pair root error') 230 success = ssl.generate_ecc_key_pair(CredInitialization.KEY_ALIAS_OEM) 231 if not success: 232 raise CredInitializationException('generate_ecc_key_pair oem error') 233 success = ssl.generate_ecc_key_pair(CredInitialization.KEY_ALIAS_DEVICE) 234 if not success: 235 raise CredInitializationException('generate_ecc_key_pair device error') 236 237 except CredInitializationException: 238 _error_message('cred init failed') 239 return 240 241 242class CredCreationException(Exception): 243 """raised when CredInitialization execution failed""" 244 pass 245 246 247class CredCreation: 248 def __init__(self, store_dir: str, file: str, payload: dict): 249 self.ssl = OpenSslWrapper(store_dir) 250 self.payload = payload 251 self.file = file 252 253 def _gene_head(self): 254 head = {"typ": "DSL"} 255 return base64.b64encode(json.dumps(head, ensure_ascii=True).encode('utf8')).decode('utf8') 256 257 def _gene_attestation(self, root_pk: str, root_sign: str, 258 oem_pk: str, oem_sign: str, 259 device_pk: str, device_sign: str): 260 data = [ 261 { 262 ATTESTATION_KEY_USERPUBLICKEY: device_pk, 263 ATTESTATION_KEY_SIGNATURE: device_sign 264 }, 265 { 266 ATTESTATION_KEY_USERPUBLICKEY: oem_pk, 267 ATTESTATION_KEY_SIGNATURE: oem_sign 268 }, 269 { 270 ATTESTATION_KEY_USERPUBLICKEY: root_pk, 271 ATTESTATION_KEY_SIGNATURE: root_sign 272 }, 273 ] 274 return base64.b64encode(json.dumps(data, ensure_ascii=True).encode('utf8')).decode('utf8') 275 276 def _gene_payload(self): 277 # add sign time 278 279 self.payload['signTime'] = datetime.now().strftime('%Y%m%d%H%M%S') 280 self.payload['version'] = CRED_VERSION 281 return base64.b64encode(json.dumps(self.payload, ensure_ascii=True).encode('utf8')).decode('utf8') 282 283 def process(self): 284 ssl = self.ssl 285 try: 286 # root self signed 287 root_pub_bytes = ssl.get_ecc_public_key(CredInitialization.KEY_ALIAS_ROOT) 288 root_pub_self_signed_bytes = ssl.digest_sign(CredInitialization.KEY_ALIAS_ROOT, root_pub_bytes) 289 root_pub_str = base64.b64encode(root_pub_bytes).decode('utf8') 290 root_pub_self_sign_str = base64.b64encode(root_pub_self_signed_bytes).decode('utf8') 291 292 # oem signed by root 293 oem_pub_bytes = ssl.get_ecc_public_key(CredInitialization.KEY_ALIAS_OEM) 294 oem_pub_signed_bytes = ssl.digest_sign(CredInitialization.KEY_ALIAS_ROOT, oem_pub_bytes) 295 oem_pub_signed_str = base64.b64encode(oem_pub_signed_bytes).decode('utf8') 296 oem_pub_str = base64.b64encode(oem_pub_bytes).decode('utf8') 297 298 # device signed by oem 299 device_pub_bytes = ssl.get_ecc_public_key(CredInitialization.KEY_ALIAS_DEVICE) 300 device_pub_signed_bytes = ssl.digest_sign(CredInitialization.KEY_ALIAS_OEM, device_pub_bytes) 301 device_pub_signed_str = base64.b64encode(device_pub_signed_bytes).decode('utf8') 302 device_pub_str = base64.b64encode(device_pub_bytes).decode('utf8') 303 304 attestation = self._gene_attestation(root_pub_str, root_pub_self_sign_str, 305 oem_pub_str, oem_pub_signed_str, 306 device_pub_str, device_pub_signed_str) 307 308 head = self._gene_head() 309 payload = self._gene_payload() 310 head_payload = '{}.{}'.format(head, payload) 311 312 head_payload_signed_bytes = ssl.digest_sign( 313 CredInitialization.KEY_ALIAS_DEVICE, head_payload.encode('utf8')) 314 head_payload_signed_string = base64.b64encode(head_payload_signed_bytes).decode('utf8') 315 cred = '{}.{}.{}'.format(head_payload, head_payload_signed_string, attestation) 316 except (CredCreationException, OpenSslWrapperException): 317 _error_message('cred create failed, please init first') 318 return 319 320 with open(self.file, 'w') as fp: 321 fp.write(cred) 322 323 324class CredVerificationException(Exception): 325 """raised when CredVerification execution failed""" 326 pass 327 328 329class CredVerification: 330 def __init__(self, store_dir: str, file: str): 331 self.ssl = OpenSslWrapper(store_dir) 332 self.file = file 333 334 def process(self): 335 try: 336 head, payload, signature, attestation = self._split_file(self.file) 337 338 self._check_head(head) 339 self._check_payload(payload) 340 self._check_signature(signature) 341 self._check_attestation(attestation, '{}.{}'.format(head, payload), signature) 342 except CredVerificationException as ex: 343 _error_message(ex) 344 return 345 346 _info_message('verify success!') 347 348 def _check_head(self, header: str): 349 header_str = self._base64decode(header).decode('utf8') 350 header_obj = ast.literal_eval(header_str) 351 if header_obj['typ'] != 'DSL': 352 raise CredVerificationException('head error') 353 _info_message('head:') 354 _info_message(json.dumps(header_obj, indent=2)) 355 356 def _check_payload(self, payload: str): 357 payload_str = self._base64decode(payload).decode('utf8') 358 _info_message('payload:') 359 _info_message(json.dumps(json.loads(payload_str), indent=2)) 360 361 def _check_signature(self, signature: str): 362 self._base64decode(signature) 363 364 def _check_attestation(self, attestation, payload, payload_sign): 365 ATTES_PARA_LEN = 3 366 attes_str = self._base64decode(attestation).decode('utf8') 367 attes_obj = json.loads(attes_str) 368 if (len(attes_obj) != ATTES_PARA_LEN): 369 raise CredVerificationException('attes para error') 370 ssl = self.ssl 371 device, oem, root = attes_obj 372 373 root_pk_bytes = self._base64decode(root[ATTESTATION_KEY_USERPUBLICKEY]) 374 root_self_sign_bytes = self._base64decode(root[ATTESTATION_KEY_SIGNATURE]) 375 verify = ssl.digest_verify_with_pub_key(root_pk_bytes, root_pk_bytes, root_self_sign_bytes, '-sha384') 376 if not verify: 377 raise CredVerificationException('root_self_sign verify error') 378 379 oem_pk_bytes = self._base64decode(oem[ATTESTATION_KEY_USERPUBLICKEY]) 380 oem_sign_bytes = self._base64decode(oem[ATTESTATION_KEY_SIGNATURE]) 381 verify = ssl.digest_verify_with_pub_key(root_pk_bytes, oem_pk_bytes, oem_sign_bytes, '-sha384') 382 if not verify: 383 raise CredVerificationException('oem_sign verify error') 384 385 device_pk_bytes = self._base64decode(device[ATTESTATION_KEY_USERPUBLICKEY]) 386 device_sign_bytes = self._base64decode(device[ATTESTATION_KEY_SIGNATURE]) 387 verify = ssl.digest_verify_with_pub_key(oem_pk_bytes, device_pk_bytes, device_sign_bytes, '-sha384') 388 if not verify: 389 raise CredVerificationException('device_sign verify error') 390 391 payload_sign_bytes = self._base64decode(payload_sign) 392 verify = ssl.digest_verify_with_pub_key(device_pk_bytes, payload.encode('utf8'), 393 payload_sign_bytes, '-sha384') 394 if not verify: 395 verify = ssl.digest_verify_with_pub_key(device_pk_bytes, payload.encode('utf8'), 396 payload_sign_bytes, '-sha256') 397 if not verify: 398 raise CredVerificationException('payload verify error') 399 400 def _split_file(self, cred_file_name: str): 401 out = self._get_file_content(cred_file_name).split('.') 402 CRED_PARA_LEN = 4 403 if (len(out) != CRED_PARA_LEN): 404 raise CredVerificationException("cred para error") 405 return out 406 407 def _get_file_content(self, file_path: str): 408 if not os.path.isfile(file_path): 409 raise CredVerificationException('file {} is not existed'.format(file_path)) 410 411 with open(file_path, 'r') as fp: 412 return fp.read().strip() 413 414 def _base64decode(self, content: str): 415 return base64.urlsafe_b64decode(content + '=' * (4 - len(content) % 4)) 416 417 418def init_cred(input_args: argparse.Namespace): 419 action = CredInitialization(input_args.dir) 420 action.process() 421 422 423def create_cred(input_args): 424 payload = {k: v for k, v in input_args.__dict__.items() if k not in ['dir', 'cred', 'process', 'strict'] and v} 425 if input_args.strict: 426 init_cred(input_args) 427 acton = CredCreation(input_args.dir, input_args.cred, payload) 428 acton.process() 429 if input_args.strict: 430 shutil.rmtree(input_args.dir) 431 432 433def verify_cred(input_args): 434 acton = CredVerification(input_args.dir, input_args.cred) 435 acton.process() 436 437 438class CredCommand: 439 def _setup_arguments(self, subparsers, config: dict): 440 action = config.get('action') 441 arguments = config.get('arguments') 442 parser = subparsers.add_parser(action.get('name'), help=action.get('help')) 443 parser.set_defaults(process=action.get('process')) 444 for item, arg in arguments.items(): 445 parser.add_argument(*arg.get('name'), dest=item, 446 metavar=arg.get('metavar'), help=arg.get('help'), 447 required=arg.get('required'), type=arg.get('type'), 448 choices=arg.get('choices'), default=arg.get('default')) 449 450 def _setup_init_cmd_parses(self, subparsers): 451 cmd_args_def = { 452 'action': { 453 'name': 'init', 454 'help': 'initialization tool for device security level credential', 455 'process': init_cred 456 }, 457 'arguments': { 458 'dir': { 459 'name': ['-d', '--artifacts-dir'], 460 'metavar': 'dir', 461 'type': str, 462 'default': 'artifacts', 463 'help': 'output artifacts dir', 464 } 465 } 466 } 467 self._setup_arguments(subparsers, cmd_args_def) 468 469 def _setup_create_cmd_parses(self, subparsers): 470 cmd_args_def = { 471 'action': { 472 'name': 'create', 473 'help': 'creation tool for device security level credential', 474 'process': create_cred, 475 }, 476 'arguments': { 477 'dir': { 478 'name': ['-d', '--artifacts-dir'], 479 'metavar': 'dir', 480 'type': str, 481 'default': 'artifacts', 482 'help': 'input artifacts dir', 483 }, 484 'type': { 485 'name': ['-t', '--field-type'], 486 'type': str, 487 'choices': ['debug', 'release'], 488 'default': 'debug', 489 'help': 'debug or release', 490 }, 491 'manufacture': { 492 'name': ['-M', '--field-manufacture'], 493 'type': str, 494 'help': 'device manufacture info', 495 'required': True 496 }, 497 'brand': { 498 'name': ['-b', '--field-brand'], 499 'type': str, 500 'help': 'device brand info', 501 'required': True 502 }, 503 'model': { 504 'name': ['-m', '--field-model'], 505 'type': str, 506 'help': 'device model info', 507 'required': True 508 }, 509 'udid': { 510 'name': ['-u', '--field-udid'], 511 'type': str, 512 'help': 'device udid info', 513 'required': False 514 }, 515 'sn': { 516 'name': ['-n', '--field-sn'], 517 'type': str, 518 'help': 'device sn info', 519 'required': False 520 }, 521 'softwareVersion': { 522 'name': ['-s', '--field-software-version'], 523 'type': str, 524 'help': 'device software version info', 525 'required': True 526 }, 527 'securityLevel': { 528 'name': ['-l', '--field-security-level'], 529 'type': str, 530 'choices': ['SL1', 'SL2', 'SL3', 'SL4', 'SL5'], 531 'default': 'SL1', 532 'help': 'device security security info', 533 }, 534 'cred': { 535 'name': ['-f', '--cred-file'], 536 'metavar': 'file', 537 'type': str, 538 'help': 'the device security level credential file to output', 539 'required': True 540 }, 541 'strict': { 542 'name': ['--strict'], 543 'metavar': 'strict', 544 'type': bool, 545 'choices': [True, False], 546 'default': False, 547 'help': 'clean up the artifacts after process', 548 }, 549 } 550 } 551 self._setup_arguments(subparsers, cmd_args_def) 552 553 def _setup_verify_cmd_parses(self, subparsers): 554 cmd_args_def = { 555 'action': { 556 'name': 'verify', 557 'help': 'verification tool for device security level credential', 558 'process': verify_cred 559 }, 560 'arguments': { 561 'cred': { 562 'name': ['-f', '--cred-file'], 563 'metavar': 'file', 564 'type': str, 565 'help': 'the device security level credential file to verify', 566 'required': True 567 }, 568 'dir': { 569 'name': ['-d', '--artifacts-dir'], 570 'metavar': 'dir', 571 'type': str, 572 'default': 'artifacts', 573 'help': 'input artifacts dir', 574 } 575 }} 576 self._setup_arguments(subparsers, cmd_args_def) 577 578 def parse_args(self): 579 parser = argparse.ArgumentParser(description='A collection of device security level credential tools') 580 subparsers = parser.add_subparsers(required=True, metavar='action') 581 self._setup_init_cmd_parses(subparsers) 582 self._setup_create_cmd_parses(subparsers) 583 self._setup_verify_cmd_parses(subparsers) 584 585 return parser.parse_args() 586 587 588if __name__ == '__main__': 589 cmd = CredCommand() 590 args = cmd.parse_args() 591 args.process(args) 592