1#!/usr/bin/env python 2# Copyright 2017 The Chromium Authors 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5"""This script is called without any arguments to re-generate all of the *.pem 6files in the script's parent directory. 7 8""" 9 10from pyasn1.codec.der import decoder, encoder 11from pyasn1_modules import rfc2560, rfc2459 12from pyasn1.type import univ, useful 13import hashlib, datetime 14import subprocess 15import os 16 17from OpenSSL import crypto 18 19import base64 20 21NEXT_SERIAL = 0 22 23# 1/1/2017 00:00 GMT 24CERT_DATE = datetime.datetime(2017, 1, 1, 0, 0) 25# 1/1/2018 00:00 GMT 26CERT_EXPIRE = CERT_DATE + datetime.timedelta(days=365) 27# 2/1/2017 00:00 GMT 28REVOKE_DATE = datetime.datetime(2017, 2, 1, 0, 0) 29# 3/1/2017 00:00 GMT 30THIS_DATE = datetime.datetime(2017, 3, 1, 0, 0) 31# 3/2/2017 00:00 GMT 32PRODUCED_DATE = datetime.datetime(2017, 3, 2, 0, 0) 33# 6/1/2017 00:00 GMT 34NEXT_DATE = datetime.datetime(2017, 6, 1, 0, 0) 35 36sha1oid = univ.ObjectIdentifier('1.3.14.3.2.26') 37sha1rsaoid = univ.ObjectIdentifier('1.2.840.113549.1.1.5') 38sha256oid = univ.ObjectIdentifier('2.16.840.1.101.3.4.2.1') 39sha256rsaoid = univ.ObjectIdentifier('1.2.840.113549.1.1.11') 40 41 42def SigAlgOid(sig_alg): 43 if sig_alg == 'sha1': 44 return sha1rsaoid 45 return sha256rsaoid 46 47 48def CreateCert(name, signer=None, ocsp=False): 49 global NEXT_SERIAL 50 pkey = crypto.PKey() 51 pkey.generate_key(crypto.TYPE_RSA, 1024) 52 cert = crypto.X509() 53 cert.set_version(2) 54 cert.get_subject().CN = name 55 cert.set_pubkey(pkey) 56 cert.set_serial_number(NEXT_SERIAL) 57 NEXT_SERIAL += 1 58 cert.set_notBefore(CERT_DATE.strftime('%Y%m%d%H%M%SZ')) 59 cert.set_notAfter(CERT_EXPIRE.strftime('%Y%m%d%H%M%SZ')) 60 if ocsp: 61 cert.add_extensions( 62 [crypto.X509Extension('extendedKeyUsage', False, 'OCSPSigning')]) 63 if signer: 64 cert.set_issuer(signer[1].get_subject()) 65 cert.sign(signer[2], 'sha1') 66 else: 67 cert.set_issuer(cert.get_subject()) 68 cert.sign(pkey, 'sha1') 69 asn1cert = decoder.decode( 70 crypto.dump_certificate(crypto.FILETYPE_ASN1, cert), 71 asn1Spec=rfc2459.Certificate())[0] 72 if not signer: 73 signer = [asn1cert] 74 return (asn1cert, cert, pkey, signer[0]) 75 76 77def CreateExtension(oid='1.2.3.4', critical=False): 78 ext = rfc2459.Extension() 79 ext.setComponentByName('extnID', univ.ObjectIdentifier(oid)) 80 ext.setComponentByName('extnValue', 'DEADBEEF') 81 if critical: 82 ext.setComponentByName('critical', univ.Boolean('True')) 83 else: 84 ext.setComponentByName('critical', univ.Boolean('False')) 85 86 return ext 87 88 89ROOT_CA = CreateCert('Test CA', None) 90CA = CreateCert('Test Intermediate CA', ROOT_CA) 91CA_LINK = CreateCert('Test OCSP Signer', CA, True) 92CA_BADLINK = CreateCert('Test False OCSP Signer', CA, False) 93CERT = CreateCert('Test Cert', CA) 94JUNK_CERT = CreateCert('Random Cert', None) 95EXTENSION = CreateExtension() 96 97 98def GetName(c): 99 rid = rfc2560.ResponderID() 100 subject = c[0].getComponentByName('tbsCertificate').getComponentByName( 101 'subject') 102 rn = rid.componentType.getTypeByPosition(0).clone() 103 for i in range(len(subject)): 104 rn.setComponentByPosition(i, subject.getComponentByPosition(i)) 105 rid.setComponentByName('byName', rn) 106 return rid 107 108 109def GetKeyHash(c): 110 rid = rfc2560.ResponderID() 111 spk = c[0].getComponentByName('tbsCertificate').getComponentByName( 112 'subjectPublicKeyInfo').getComponentByName('subjectPublicKey') 113 keyHash = hashlib.sha1(encoder.encode(spk)[4:]).digest() 114 rid.setComponentByName('byKey', keyHash) 115 return rid 116 117 118def CreateSingleResponse(cert=CERT, 119 status=0, 120 next=None, 121 revoke_time=None, 122 reason=None, 123 extensions=[]): 124 sr = rfc2560.SingleResponse() 125 cid = sr.setComponentByName('certID').getComponentByName('certID') 126 127 issuer_tbs = cert[3].getComponentByName('tbsCertificate') 128 tbs = cert[0].getComponentByName('tbsCertificate') 129 name_hash = hashlib.sha1( 130 encoder.encode(issuer_tbs.getComponentByName('subject'))).digest() 131 key_hash = hashlib.sha1( 132 encoder.encode( 133 issuer_tbs.getComponentByName('subjectPublicKeyInfo') 134 .getComponentByName('subjectPublicKey'))[4:]).digest() 135 sn = tbs.getComponentByName('serialNumber') 136 137 ha = cid.setComponentByName('hashAlgorithm').getComponentByName( 138 'hashAlgorithm') 139 ha.setComponentByName('algorithm', sha1oid) 140 cid.setComponentByName('issuerNameHash', name_hash) 141 cid.setComponentByName('issuerKeyHash', key_hash) 142 cid.setComponentByName('serialNumber', sn) 143 144 cs = rfc2560.CertStatus() 145 if status == 0: 146 cs.setComponentByName('good') 147 elif status == 1: 148 ri = cs.componentType.getTypeByPosition(1).clone() 149 if revoke_time == None: 150 revoke_time = REVOKE_DATE 151 ri.setComponentByName('revocationTime', 152 useful.GeneralizedTime( 153 revoke_time.strftime('%Y%m%d%H%M%SZ'))) 154 if reason: 155 ri.setComponentByName('revocationReason', reason) 156 cs.setComponentByName('revoked', ri) 157 else: 158 ui = cs.componentType.getTypeByPosition(2).clone() 159 cs.setComponentByName('unknown', ui) 160 161 sr.setComponentByName('certStatus', cs) 162 163 sr.setComponentByName('thisUpdate', 164 useful.GeneralizedTime( 165 THIS_DATE.strftime('%Y%m%d%H%M%SZ'))) 166 if next: 167 sr.setComponentByName('nextUpdate', next.strftime('%Y%m%d%H%M%SZ')) 168 if extensions: 169 elist = sr.setComponentByName('singleExtensions').getComponentByName( 170 'singleExtensions') 171 for i in range(len(extensions)): 172 elist.setComponentByPosition(i, extensions[i]) 173 return sr 174 175 176def Create(signer=None, 177 response_status=0, 178 response_type='1.3.6.1.5.5.7.48.1.1', 179 signature=None, 180 version=1, 181 responder=None, 182 responses=None, 183 extensions=None, 184 certs=None, 185 sigAlg='sha1'): 186 ocsp = rfc2560.OCSPResponse() 187 ocsp.setComponentByName('responseStatus', response_status) 188 189 if response_status != 0: 190 return ocsp 191 192 tbs = rfc2560.ResponseData() 193 if version != 1: 194 tbs.setComponentByName('version', version) 195 196 if not signer: 197 signer = CA 198 if not responder: 199 responder = GetName(signer) 200 tbs.setComponentByName('responderID', responder) 201 tbs.setComponentByName('producedAt', 202 useful.GeneralizedTime( 203 PRODUCED_DATE.strftime('%Y%m%d%H%M%SZ'))) 204 rlist = tbs.setComponentByName('responses').getComponentByName('responses') 205 if responses == None: 206 responses = [CreateSingleResponse(CERT, 0)] 207 if responses: 208 for i in range(len(responses)): 209 rlist.setComponentByPosition(i, responses[i]) 210 211 if extensions: 212 elist = tbs.setComponentByName('responseExtensions').getComponentByName( 213 'responseExtensions') 214 for i in range(len(extensions)): 215 elist.setComponentByPosition(i, extensions[i]) 216 217 sa = rfc2459.AlgorithmIdentifier() 218 sa.setComponentByName('algorithm', SigAlgOid(sigAlg)) 219 # TODO(mattm): If pyasn1 gives an error 220 # "Component value is tag-incompatible: Null() vs Any()", try hacking 221 # pyasn1_modules/rfc2459.py's AlgorithmIdentifier to specify univ.Null as the 222 # type for 'parameters'. (Which is an ugly hack, but lets the script work.) 223 sa.setComponentByName('parameters', univ.Null()) 224 225 basic = rfc2560.BasicOCSPResponse() 226 basic.setComponentByName('tbsResponseData', tbs) 227 basic.setComponentByName('signatureAlgorithm', sa) 228 if not signature: 229 signature = crypto.sign(signer[2], encoder.encode(tbs), sigAlg) 230 basic.setComponentByName('signature', 231 univ.BitString("'%s'H" % (signature.encode('hex')))) 232 if certs: 233 cs = basic.setComponentByName('certs').getComponentByName('certs') 234 for i in range(len(certs)): 235 cs.setComponentByPosition(i, certs[i][0]) 236 237 rbytes = ocsp.componentType.getTypeByPosition(1) 238 rbytes.setComponentByName('responseType', 239 univ.ObjectIdentifier(response_type)) 240 rbytes.setComponentByName('response', encoder.encode(basic)) 241 242 ocsp.setComponentByName('responseBytes', rbytes) 243 return ocsp 244 245 246def MakePemBlock(der, name): 247 b64 = base64.b64encode(der) 248 wrapped = '\n'.join(b64[pos:pos + 64] for pos in xrange(0, len(b64), 64)) 249 return '-----BEGIN %s-----\n%s\n-----END %s-----' % (name, wrapped, name) 250 251 252def WriteStringToFile(data, path): 253 with open(path, "w") as f: 254 f.write(data) 255 256 257def ReadFileToString(path): 258 with open(path, 'r') as f: 259 return f.read() 260 261 262def CreateOCSPRequestDer(issuer_cert_pem, cert_pem): 263 '''Uses OpenSSL to generate a basic OCSPRequest for |cert_pem|.''' 264 265 issuer_path = "tmp_issuer.pem" 266 cert_path = "tmp_cert.pem" 267 request_path = "tmp_request.der" 268 269 WriteStringToFile(issuer_cert_pem, issuer_path) 270 WriteStringToFile(cert_pem, cert_path) 271 272 p = subprocess.Popen(["openssl", "ocsp", "-no_nonce", "-issuer", issuer_path, 273 "-cert", cert_path, "-reqout", request_path], 274 stdin=subprocess.PIPE, 275 stdout=subprocess.PIPE, 276 stderr=subprocess.PIPE) 277 stdout_data, stderr_data = p.communicate() 278 279 os.remove(issuer_path) 280 os.remove(cert_path) 281 282 result = None 283 if p.returncode == 0: 284 result = ReadFileToString(request_path) 285 286 os.remove(request_path) 287 return result 288 289 290def Store(fname, description, ca, data): 291 ca_cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, ca[1]) 292 cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, CERT[1]) 293 294 ocsp_request_der = CreateOCSPRequestDer(ca_cert_pem, cert_pem) 295 296 out = ('%s\n%s\n%s\n\n%s\n%s') % ( 297 description, 298 MakePemBlock(encoder.encode(data), "OCSP RESPONSE"), 299 ca_cert_pem.replace('CERTIFICATE', 'CA CERTIFICATE'), 300 cert_pem, 301 MakePemBlock(ocsp_request_der, "OCSP REQUEST")) 302 open('%s.pem' % fname, 'w').write(out) 303 304 305Store( 306 'no_response', 307 'No SingleResponses attached to the response', 308 CA, 309 Create(responses=[])) 310 311Store( 312 'malformed_request', 313 'Has a status of MALFORMED_REQUEST', 314 CA, 315 Create(response_status=1)) 316Store( 317 'bad_status', 318 'Has an invalid status larger than the defined Status enumeration', 319 CA, 320 Create(response_status=17)) 321Store( 322 'bad_ocsp_type', 323 'Has an invalid OCSP OID', 324 CA, 325 Create(response_type='1.3.6.1.5.5.7.48.1.2')) 326Store( 327 'bad_signature', 328 'Has an invalid signature', 329 CA, 330 Create(signature='\xde\xad\xbe\xef')) 331Store('ocsp_sign_direct', 'Signed directly by the issuer', CA, 332 Create(signer=CA, certs=[])) 333Store('ocsp_sign_indirect', 'Signed indirectly through an intermediate', CA, 334 Create(signer=CA_LINK, certs=[CA_LINK])) 335Store('ocsp_sign_indirect_missing', 336 'Signed indirectly through a missing intermediate', CA, 337 Create(signer=CA_LINK, certs=[])) 338Store('ocsp_sign_bad_indirect', 339 'Signed through an intermediate without the correct key usage', CA, 340 Create(signer=CA_BADLINK, certs=[CA_BADLINK])) 341Store('ocsp_extra_certs', 'Includes extra certs', CA, 342 Create(signer=CA, certs=[CA, CA_LINK])) 343Store('has_version', 'Includes a default version V1', CA, Create(version=1)) 344Store( 345 'responder_name', 346 'Uses byName to identify the signer', 347 CA, 348 Create(responder=GetName(CA))) 349 350# TODO(eroman): pyasn1 module has a bug in rfc2560.ResponderID() that will use 351# IMPLICIT rather than EXPLICIT tagging for byKey 352# (https://github.com/etingof/pyasn1-modules/issues/8). If using an affected 353# version of the library you will need to patch pyasn1_modules/rfc2560.py and 354# replace "implicitTag" with "explicitTag" in ResponderID to generate this 355# test data correctly. 356Store( 357 'responder_id', 358 'Uses byKey to identify the signer', 359 CA, 360 Create(responder=GetKeyHash(CA))) 361Store( 362 'has_extension', 363 'Includes an x509v3 extension', 364 CA, 365 Create(extensions=[EXTENSION])) 366 367Store( 368 'good_response', 369 'Is a valid response for the cert', 370 CA, 371 Create(responses=[CreateSingleResponse(CERT, 0)])) 372Store('good_response_sha256', 373 'Is a valid response for the cert with a SHA256 signature', CA, 374 Create(responses=[CreateSingleResponse(CERT, 0)], sigAlg='sha256')) 375Store( 376 'good_response_next_update', 377 'Is a valid response for the cert until nextUpdate', 378 CA, 379 Create(responses=[CreateSingleResponse(CERT, 0, next=NEXT_DATE)])) 380Store( 381 'revoke_response', 382 'Is a REVOKE response for the cert', 383 CA, 384 Create(responses=[CreateSingleResponse(CERT, 1)])) 385Store( 386 'revoke_response_reason', 387 'Is a REVOKE response for the cert with a reason', 388 CA, 389 Create(responses=[ 390 CreateSingleResponse(CERT, 1, revoke_time=REVOKE_DATE, reason=1) 391 ])) 392Store( 393 'unknown_response', 394 'Is an UNKNOWN response for the cert', 395 CA, 396 Create(responses=[CreateSingleResponse(CERT, 2)])) 397Store( 398 'multiple_response', 399 'Has multiple responses for the cert', 400 CA, 401 Create(responses=[ 402 CreateSingleResponse(CERT, 0), 403 CreateSingleResponse(CERT, 2) 404 ])) 405Store( 406 'other_response', 407 'Is a response for a different cert', 408 CA, 409 Create(responses=[ 410 CreateSingleResponse(JUNK_CERT, 0), 411 CreateSingleResponse(JUNK_CERT, 1) 412 ])) 413Store( 414 'has_single_extension', 415 'Has an extension in the SingleResponse', 416 CA, 417 Create(responses=[ 418 CreateSingleResponse(CERT, 0, extensions=[CreateExtension()]) 419 ])) 420Store( 421 'has_critical_single_extension', 422 'Has a critical extension in the SingleResponse', CA, 423 Create(responses=[ 424 CreateSingleResponse( 425 CERT, 0, extensions=[CreateExtension('1.2.3.4', critical=True)]) 426 ])) 427Store( 428 'has_critical_response_extension', 429 'Has a critical extension in the ResponseData', CA, 430 Create( 431 responses=[CreateSingleResponse(CERT, 0)], 432 extensions=[CreateExtension('1.2.3.4', critical=True)])) 433Store( 434 'has_critical_ct_extension', 435 'Has a critical CT extension in the SingleResponse', CA, 436 Create(responses=[ 437 CreateSingleResponse( 438 CERT, 439 0, 440 extensions=[ 441 CreateExtension('1.3.6.1.4.1.11129.2.4.5', critical=True) 442 ]) 443 ])) 444 445Store('missing_response', 'Missing a response for the cert', CA, 446 Create(response_status=0, responses=[])) 447