• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Generate test data
15
16Generate data needed for unit tests, i.e. certificates, keys, and CRLSet.
17"""
18
19import argparse
20import subprocess
21import sys
22from datetime import datetime, timedelta
23from typing import List, Tuple
24
25from cryptography import x509
26from cryptography.hazmat.primitives import hashes
27from cryptography.hazmat.primitives import serialization
28from cryptography.hazmat.primitives.asymmetric import rsa
29from cryptography.x509.oid import NameOID
30
31CERTS_AND_KEYS_HEADER = """// Copyright 2021 The Pigweed Authors
32//
33// Licensed under the Apache License, Version 2.0 (the "License"); you may not
34// use this file except in compliance with the License. You may obtain a copy
35// of the License at
36//
37//     https://www.apache.org/licenses/LICENSE-2.0
38//
39// Unless required by applicable law or agreed to in writing, software
40// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
41// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
42// License for the specific language governing permissions and limitations under
43// the License.
44
45#pragma once
46
47#include "pw_bytes/span.h"
48
49"""
50
51
52class Subject:
53    """A subject wraps a name, private key and extensions for issuers
54    to issue its certificate"""
55    def __init__(self, name: str, extensions: List[Tuple[x509.ExtensionType,
56                                                         bool]]):
57        self._subject_name = x509.Name([
58            x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
59            x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"California"),
60            x509.NameAttribute(NameOID.LOCALITY_NAME, u"Mountain View"),
61            x509.NameAttribute(NameOID.ORGANIZATION_NAME, name),
62            x509.NameAttribute(NameOID.COMMON_NAME, u"Google-Pigweed"),
63        ])
64        self._private_key = rsa.generate_private_key(public_exponent=65537,
65                                                     key_size=2048)
66        self._extensions = extensions
67
68    def subject_name(self) -> x509.Name:
69        """Returns the subject name"""
70        return self._subject_name
71
72    def public_key(self) -> rsa.RSAPublicKey:
73        """Returns the public key of this subject"""
74        return self._private_key.public_key()
75
76    def private_key(self) -> rsa.RSAPrivateKey:
77        """Returns the private key of this subject"""
78        return self._private_key
79
80    def extensions(self) -> List[Tuple[x509.ExtensionType, bool]]:
81        """Returns the requested extensions for issuer"""
82        return self._extensions
83
84
85class CA(Subject):
86    """A CA/Sub-ca that issues certificates"""
87    def __init__(self, *args, **kwargs):
88        ext = [(x509.BasicConstraints(True, None), True),
89               (x509.KeyUsage(
90                   digital_signature=False,
91                   content_commitment=False,
92                   key_encipherment=False,
93                   data_encipherment=False,
94                   key_agreement=False,
95                   crl_sign=False,
96                   encipher_only=False,
97                   decipher_only=False,
98                   key_cert_sign=True,
99               ), True)]
100        super().__init__(*args, extensions=ext, **kwargs)
101
102    def sign(self, subject: Subject, not_before: datetime,
103             not_after: datetime) -> x509.Certificate:
104        """Issues a certificate for another CA/Sub-ca/Server"""
105        builder = x509.CertificateBuilder()
106
107        # Subject name is the target's subject name
108        builder = builder.subject_name(subject.subject_name())
109
110        # Issuer name is this CA/sub-ca's subject name
111        builder = builder.issuer_name(self._subject_name)
112
113        # Public key is the target's public key.
114        builder = builder.public_key(subject.public_key())
115
116        # Validity period.
117        builder = builder.not_valid_before(not_before).not_valid_after(
118            not_after)
119
120        # Uses a random serial number
121        builder = builder.serial_number(x509.random_serial_number())
122
123        # Add extensions
124        for extension, critical in subject.extensions():
125            builder = builder.add_extension(extension, critical)
126
127        # Sign and returns the certificate.
128        return builder.sign(self._private_key, hashes.SHA256())
129
130    def self_sign(self, not_before: datetime,
131                  not_after: datetime) -> x509.Certificate:
132        """Issues a self sign certificate"""
133        return self.sign(self, not_before, not_after)
134
135
136class Server(Subject):
137    """The end-entity server"""
138    def __init__(self, *args, **kwargs):
139        ext = [
140            (x509.BasicConstraints(False, None), True),
141            (x509.KeyUsage(
142                digital_signature=True,
143                content_commitment=False,
144                key_encipherment=False,
145                data_encipherment=False,
146                key_agreement=False,
147                crl_sign=False,
148                encipher_only=False,
149                decipher_only=False,
150                key_cert_sign=False,
151            ), True),
152            (x509.ExtendedKeyUsage([x509.ExtendedKeyUsageOID.SERVER_AUTH]),
153             True),
154        ]
155        super().__init__(*args, extensions=ext, **kwargs)
156
157
158def c_escaped_string(data: bytes):
159    """Generates a C byte string representation for a byte array
160
161    For example, given a byte sequence of [0x12, 0x34, 0x56]. The function
162    generates the following byte string code:
163
164            {"\x12\x34\x56", 3}
165    """
166    body = ''.join([f'\\x{b:02x}' for b in data])
167    return f'{{\"{body}\", {len(data)}}}'
168
169
170def byte_array_declaration(data: bytes, name: str) -> str:
171    """Generates a ConstByteSpan declaration for a byte array"""
172    type_name = '[[maybe_unused]] const pw::ConstByteSpan'
173    array_body = f'std::as_bytes(std::span{c_escaped_string(data)})'
174    return f'{type_name} {name} = {array_body};'
175
176
177class Codegen:
178    """Base helper class for code generation"""
179    def generate_code(self) -> str:
180        """Generates C++ code for this object"""
181
182
183class PrivateKeyGen(Codegen):
184    """Codegen class for a private key"""
185    def __init__(self, key: rsa.RSAPrivateKey, name: str):
186        self._key = key
187        self._name = name
188
189    def generate_code(self) -> str:
190        """Code generation"""
191        return byte_array_declaration(
192            self._key.private_bytes(
193                serialization.Encoding.DER,
194                serialization.PrivateFormat.TraditionalOpenSSL,
195                serialization.NoEncryption()), self._name)
196
197
198class CertificateGen(Codegen):
199    """Codegen class for a single certificate"""
200    def __init__(self, cert: x509.Certificate, name: str):
201        self._cert = cert
202        self._name = name
203
204    def generate_code(self) -> str:
205        """Code generation"""
206        return byte_array_declaration(
207            self._cert.public_bytes(serialization.Encoding.DER), self._name)
208
209
210def generate_test_data() -> str:
211    """Generates test data"""
212    subjects: List[Codegen] = []
213
214    # Working valid period.
215    # Start from yesterday, to make sure we are in the valid period.
216    not_before = datetime.utcnow() - timedelta(days=1)
217    # Valid for 1 year.
218    not_after = not_before + timedelta(days=365)
219
220    # Generate a root-A CA certificates
221    root_a = CA("root-A")
222    subjects.append(
223        CertificateGen(root_a.self_sign(not_before, not_after), "kRootACert"))
224
225    # Generate a sub CA certificate signed by root-A.
226    sub = CA("sub")
227    subjects.append(
228        CertificateGen(root_a.sign(sub, not_before, not_after), "kSubCACert"))
229
230    # Generate a valid server certificate signed by sub
231    server = Server("server")
232    subjects.append(
233        CertificateGen(sub.sign(server, not_before, not_after), "kServerCert"))
234    subjects.append(PrivateKeyGen(server.private_key(), "kServerKey"))
235
236    root_b = CA("root-B")
237    subjects.append(
238        CertificateGen(root_b.self_sign(not_before, not_after), "kRootBCert"))
239
240    code = 'namespace {\n\n'
241    for subject in subjects:
242        code += subject.generate_code() + '\n\n'
243    code += '}\n'
244
245    return code
246
247
248def clang_format(file):
249    subprocess.run([
250        "clang-format",
251        "-i",
252        file,
253    ], check=True)
254
255
256def parse_args():
257    """Setup argparse."""
258    parser = argparse.ArgumentParser()
259    parser.add_argument(
260        "certs_and_keys_header",
261        help="output header file for test certificates and keys")
262    return parser.parse_args()
263
264
265def main() -> int:
266    """Main"""
267    args = parse_args()
268
269    certs_and_keys = generate_test_data()
270
271    with open(args.certs_and_keys_header, 'w') as header:
272        header.write(CERTS_AND_KEYS_HEADER)
273        header.write(certs_and_keys)
274
275    clang_format(args.certs_and_keys_header)
276    return 0
277
278
279if __name__ == "__main__":
280    sys.exit(main())
281