1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Generate test data 15 16Generate data needed for unit tests, i.e. certificates, keys, and CRLSet. 17""" 18 19import argparse 20import subprocess 21import sys 22from datetime import datetime, timedelta 23from typing import List, Tuple 24 25from cryptography import x509 26from cryptography.hazmat.primitives import hashes 27from cryptography.hazmat.primitives import serialization 28from cryptography.hazmat.primitives.asymmetric import rsa 29from cryptography.x509.oid import NameOID 30 31CERTS_AND_KEYS_HEADER = """// Copyright 2021 The Pigweed Authors 32// 33// Licensed under the Apache License, Version 2.0 (the "License"); you may not 34// use this file except in compliance with the License. You may obtain a copy 35// of the License at 36// 37// https://www.apache.org/licenses/LICENSE-2.0 38// 39// Unless required by applicable law or agreed to in writing, software 40// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 41// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 42// License for the specific language governing permissions and limitations under 43// the License. 44 45#pragma once 46 47#include "pw_bytes/span.h" 48 49""" 50 51 52class Subject: 53 """A subject wraps a name, private key and extensions for issuers 54 to issue its certificate""" 55 def __init__(self, name: str, extensions: List[Tuple[x509.ExtensionType, 56 bool]]): 57 self._subject_name = x509.Name([ 58 x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"), 59 x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"California"), 60 x509.NameAttribute(NameOID.LOCALITY_NAME, u"Mountain View"), 61 x509.NameAttribute(NameOID.ORGANIZATION_NAME, name), 62 x509.NameAttribute(NameOID.COMMON_NAME, u"Google-Pigweed"), 63 ]) 64 self._private_key = rsa.generate_private_key(public_exponent=65537, 65 key_size=2048) 66 self._extensions = extensions 67 68 def subject_name(self) -> x509.Name: 69 """Returns the subject name""" 70 return self._subject_name 71 72 def public_key(self) -> rsa.RSAPublicKey: 73 """Returns the public key of this subject""" 74 return self._private_key.public_key() 75 76 def private_key(self) -> rsa.RSAPrivateKey: 77 """Returns the private key of this subject""" 78 return self._private_key 79 80 def extensions(self) -> List[Tuple[x509.ExtensionType, bool]]: 81 """Returns the requested extensions for issuer""" 82 return self._extensions 83 84 85class CA(Subject): 86 """A CA/Sub-ca that issues certificates""" 87 def __init__(self, *args, **kwargs): 88 ext = [(x509.BasicConstraints(True, None), True), 89 (x509.KeyUsage( 90 digital_signature=False, 91 content_commitment=False, 92 key_encipherment=False, 93 data_encipherment=False, 94 key_agreement=False, 95 crl_sign=False, 96 encipher_only=False, 97 decipher_only=False, 98 key_cert_sign=True, 99 ), True)] 100 super().__init__(*args, extensions=ext, **kwargs) 101 102 def sign(self, subject: Subject, not_before: datetime, 103 not_after: datetime) -> x509.Certificate: 104 """Issues a certificate for another CA/Sub-ca/Server""" 105 builder = x509.CertificateBuilder() 106 107 # Subject name is the target's subject name 108 builder = builder.subject_name(subject.subject_name()) 109 110 # Issuer name is this CA/sub-ca's subject name 111 builder = builder.issuer_name(self._subject_name) 112 113 # Public key is the target's public key. 114 builder = builder.public_key(subject.public_key()) 115 116 # Validity period. 117 builder = builder.not_valid_before(not_before).not_valid_after( 118 not_after) 119 120 # Uses a random serial number 121 builder = builder.serial_number(x509.random_serial_number()) 122 123 # Add extensions 124 for extension, critical in subject.extensions(): 125 builder = builder.add_extension(extension, critical) 126 127 # Sign and returns the certificate. 128 return builder.sign(self._private_key, hashes.SHA256()) 129 130 def self_sign(self, not_before: datetime, 131 not_after: datetime) -> x509.Certificate: 132 """Issues a self sign certificate""" 133 return self.sign(self, not_before, not_after) 134 135 136class Server(Subject): 137 """The end-entity server""" 138 def __init__(self, *args, **kwargs): 139 ext = [ 140 (x509.BasicConstraints(False, None), True), 141 (x509.KeyUsage( 142 digital_signature=True, 143 content_commitment=False, 144 key_encipherment=False, 145 data_encipherment=False, 146 key_agreement=False, 147 crl_sign=False, 148 encipher_only=False, 149 decipher_only=False, 150 key_cert_sign=False, 151 ), True), 152 (x509.ExtendedKeyUsage([x509.ExtendedKeyUsageOID.SERVER_AUTH]), 153 True), 154 ] 155 super().__init__(*args, extensions=ext, **kwargs) 156 157 158def c_escaped_string(data: bytes): 159 """Generates a C byte string representation for a byte array 160 161 For example, given a byte sequence of [0x12, 0x34, 0x56]. The function 162 generates the following byte string code: 163 164 {"\x12\x34\x56", 3} 165 """ 166 body = ''.join([f'\\x{b:02x}' for b in data]) 167 return f'{{\"{body}\", {len(data)}}}' 168 169 170def byte_array_declaration(data: bytes, name: str) -> str: 171 """Generates a ConstByteSpan declaration for a byte array""" 172 type_name = '[[maybe_unused]] const pw::ConstByteSpan' 173 array_body = f'std::as_bytes(std::span{c_escaped_string(data)})' 174 return f'{type_name} {name} = {array_body};' 175 176 177class Codegen: 178 """Base helper class for code generation""" 179 def generate_code(self) -> str: 180 """Generates C++ code for this object""" 181 182 183class PrivateKeyGen(Codegen): 184 """Codegen class for a private key""" 185 def __init__(self, key: rsa.RSAPrivateKey, name: str): 186 self._key = key 187 self._name = name 188 189 def generate_code(self) -> str: 190 """Code generation""" 191 return byte_array_declaration( 192 self._key.private_bytes( 193 serialization.Encoding.DER, 194 serialization.PrivateFormat.TraditionalOpenSSL, 195 serialization.NoEncryption()), self._name) 196 197 198class CertificateGen(Codegen): 199 """Codegen class for a single certificate""" 200 def __init__(self, cert: x509.Certificate, name: str): 201 self._cert = cert 202 self._name = name 203 204 def generate_code(self) -> str: 205 """Code generation""" 206 return byte_array_declaration( 207 self._cert.public_bytes(serialization.Encoding.DER), self._name) 208 209 210def generate_test_data() -> str: 211 """Generates test data""" 212 subjects: List[Codegen] = [] 213 214 # Working valid period. 215 # Start from yesterday, to make sure we are in the valid period. 216 not_before = datetime.utcnow() - timedelta(days=1) 217 # Valid for 1 year. 218 not_after = not_before + timedelta(days=365) 219 220 # Generate a root-A CA certificates 221 root_a = CA("root-A") 222 subjects.append( 223 CertificateGen(root_a.self_sign(not_before, not_after), "kRootACert")) 224 225 # Generate a sub CA certificate signed by root-A. 226 sub = CA("sub") 227 subjects.append( 228 CertificateGen(root_a.sign(sub, not_before, not_after), "kSubCACert")) 229 230 # Generate a valid server certificate signed by sub 231 server = Server("server") 232 subjects.append( 233 CertificateGen(sub.sign(server, not_before, not_after), "kServerCert")) 234 subjects.append(PrivateKeyGen(server.private_key(), "kServerKey")) 235 236 root_b = CA("root-B") 237 subjects.append( 238 CertificateGen(root_b.self_sign(not_before, not_after), "kRootBCert")) 239 240 code = 'namespace {\n\n' 241 for subject in subjects: 242 code += subject.generate_code() + '\n\n' 243 code += '}\n' 244 245 return code 246 247 248def clang_format(file): 249 subprocess.run([ 250 "clang-format", 251 "-i", 252 file, 253 ], check=True) 254 255 256def parse_args(): 257 """Setup argparse.""" 258 parser = argparse.ArgumentParser() 259 parser.add_argument( 260 "certs_and_keys_header", 261 help="output header file for test certificates and keys") 262 return parser.parse_args() 263 264 265def main() -> int: 266 """Main""" 267 args = parse_args() 268 269 certs_and_keys = generate_test_data() 270 271 with open(args.certs_and_keys_header, 'w') as header: 272 header.write(CERTS_AND_KEYS_HEADER) 273 header.write(certs_and_keys) 274 275 clang_format(args.certs_and_keys_header) 276 return 0 277 278 279if __name__ == "__main__": 280 sys.exit(main()) 281