• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# Copyright 2017 The Chromium Authors
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5"""This script is called without any arguments to re-generate all of the *.pem
6files in the script's parent directory.
7
8"""
9
10from pyasn1.codec.der import decoder, encoder
11from pyasn1_modules import rfc2560, rfc2459
12from pyasn1.type import univ, useful
13import hashlib, datetime
14import subprocess
15import os
16
17from OpenSSL import crypto
18
19import base64
20
21NEXT_SERIAL = 0
22
23# 1/1/2017 00:00 GMT
24CERT_DATE = datetime.datetime(2017, 1, 1, 0, 0)
25# 1/1/2018 00:00 GMT
26CERT_EXPIRE = CERT_DATE + datetime.timedelta(days=365)
27# 2/1/2017 00:00 GMT
28REVOKE_DATE = datetime.datetime(2017, 2, 1, 0, 0)
29# 3/1/2017 00:00 GMT
30THIS_DATE = datetime.datetime(2017, 3, 1, 0, 0)
31# 3/2/2017 00:00 GMT
32PRODUCED_DATE = datetime.datetime(2017, 3, 2, 0, 0)
33# 6/1/2017 00:00 GMT
34NEXT_DATE = datetime.datetime(2017, 6, 1, 0, 0)
35
36sha1oid = univ.ObjectIdentifier('1.3.14.3.2.26')
37sha1rsaoid = univ.ObjectIdentifier('1.2.840.113549.1.1.5')
38sha256oid = univ.ObjectIdentifier('2.16.840.1.101.3.4.2.1')
39sha256rsaoid = univ.ObjectIdentifier('1.2.840.113549.1.1.11')
40
41
42def SigAlgOid(sig_alg):
43  if sig_alg == 'sha1':
44    return sha1rsaoid
45  return sha256rsaoid
46
47
48def CreateCert(name, signer=None, ocsp=False):
49  global NEXT_SERIAL
50  pkey = crypto.PKey()
51  pkey.generate_key(crypto.TYPE_RSA, 1024)
52  cert = crypto.X509()
53  cert.set_version(2)
54  cert.get_subject().CN = name
55  cert.set_pubkey(pkey)
56  cert.set_serial_number(NEXT_SERIAL)
57  NEXT_SERIAL += 1
58  cert.set_notBefore(CERT_DATE.strftime('%Y%m%d%H%M%SZ'))
59  cert.set_notAfter(CERT_EXPIRE.strftime('%Y%m%d%H%M%SZ'))
60  if ocsp:
61    cert.add_extensions(
62        [crypto.X509Extension('extendedKeyUsage', False, 'OCSPSigning')])
63  if signer:
64    cert.set_issuer(signer[1].get_subject())
65    cert.sign(signer[2], 'sha1')
66  else:
67    cert.set_issuer(cert.get_subject())
68    cert.sign(pkey, 'sha1')
69  asn1cert = decoder.decode(
70      crypto.dump_certificate(crypto.FILETYPE_ASN1, cert),
71      asn1Spec=rfc2459.Certificate())[0]
72  if not signer:
73    signer = [asn1cert]
74  return (asn1cert, cert, pkey, signer[0])
75
76
77def CreateExtension(oid='1.2.3.4', critical=False):
78  ext = rfc2459.Extension()
79  ext.setComponentByName('extnID', univ.ObjectIdentifier(oid))
80  ext.setComponentByName('extnValue', 'DEADBEEF')
81  if critical:
82    ext.setComponentByName('critical', univ.Boolean('True'))
83  else:
84    ext.setComponentByName('critical', univ.Boolean('False'))
85
86  return ext
87
88
89ROOT_CA = CreateCert('Test CA', None)
90CA = CreateCert('Test Intermediate CA', ROOT_CA)
91CA_LINK = CreateCert('Test OCSP Signer', CA, True)
92CA_BADLINK = CreateCert('Test False OCSP Signer', CA, False)
93CERT = CreateCert('Test Cert', CA)
94JUNK_CERT = CreateCert('Random Cert', None)
95EXTENSION = CreateExtension()
96
97
98def GetName(c):
99  rid = rfc2560.ResponderID()
100  subject = c[0].getComponentByName('tbsCertificate').getComponentByName(
101      'subject')
102  rn = rid.componentType.getTypeByPosition(0).clone()
103  for i in range(len(subject)):
104    rn.setComponentByPosition(i, subject.getComponentByPosition(i))
105  rid.setComponentByName('byName', rn)
106  return rid
107
108
109def GetKeyHash(c):
110  rid = rfc2560.ResponderID()
111  spk = c[0].getComponentByName('tbsCertificate').getComponentByName(
112      'subjectPublicKeyInfo').getComponentByName('subjectPublicKey')
113  keyHash = hashlib.sha1(encoder.encode(spk)[4:]).digest()
114  rid.setComponentByName('byKey', keyHash)
115  return rid
116
117
118def CreateSingleResponse(cert=CERT,
119                         status=0,
120                         next=None,
121                         revoke_time=None,
122                         reason=None,
123                         extensions=[]):
124  sr = rfc2560.SingleResponse()
125  cid = sr.setComponentByName('certID').getComponentByName('certID')
126
127  issuer_tbs = cert[3].getComponentByName('tbsCertificate')
128  tbs = cert[0].getComponentByName('tbsCertificate')
129  name_hash = hashlib.sha1(
130      encoder.encode(issuer_tbs.getComponentByName('subject'))).digest()
131  key_hash = hashlib.sha1(
132      encoder.encode(
133          issuer_tbs.getComponentByName('subjectPublicKeyInfo')
134          .getComponentByName('subjectPublicKey'))[4:]).digest()
135  sn = tbs.getComponentByName('serialNumber')
136
137  ha = cid.setComponentByName('hashAlgorithm').getComponentByName(
138      'hashAlgorithm')
139  ha.setComponentByName('algorithm', sha1oid)
140  cid.setComponentByName('issuerNameHash', name_hash)
141  cid.setComponentByName('issuerKeyHash', key_hash)
142  cid.setComponentByName('serialNumber', sn)
143
144  cs = rfc2560.CertStatus()
145  if status == 0:
146    cs.setComponentByName('good')
147  elif status == 1:
148    ri = cs.componentType.getTypeByPosition(1).clone()
149    if revoke_time == None:
150      revoke_time = REVOKE_DATE
151    ri.setComponentByName('revocationTime',
152                          useful.GeneralizedTime(
153                              revoke_time.strftime('%Y%m%d%H%M%SZ')))
154    if reason:
155      ri.setComponentByName('revocationReason', reason)
156    cs.setComponentByName('revoked', ri)
157  else:
158    ui = cs.componentType.getTypeByPosition(2).clone()
159    cs.setComponentByName('unknown', ui)
160
161  sr.setComponentByName('certStatus', cs)
162
163  sr.setComponentByName('thisUpdate',
164                        useful.GeneralizedTime(
165                            THIS_DATE.strftime('%Y%m%d%H%M%SZ')))
166  if next:
167    sr.setComponentByName('nextUpdate', next.strftime('%Y%m%d%H%M%SZ'))
168  if extensions:
169    elist = sr.setComponentByName('singleExtensions').getComponentByName(
170        'singleExtensions')
171    for i in range(len(extensions)):
172      elist.setComponentByPosition(i, extensions[i])
173  return sr
174
175
176def Create(signer=None,
177           response_status=0,
178           response_type='1.3.6.1.5.5.7.48.1.1',
179           signature=None,
180           version=1,
181           responder=None,
182           responses=None,
183           extensions=None,
184           certs=None,
185           sigAlg='sha1'):
186  ocsp = rfc2560.OCSPResponse()
187  ocsp.setComponentByName('responseStatus', response_status)
188
189  if response_status != 0:
190    return ocsp
191
192  tbs = rfc2560.ResponseData()
193  if version != 1:
194    tbs.setComponentByName('version', version)
195
196  if not signer:
197    signer = CA
198  if not responder:
199    responder = GetName(signer)
200  tbs.setComponentByName('responderID', responder)
201  tbs.setComponentByName('producedAt',
202                         useful.GeneralizedTime(
203                             PRODUCED_DATE.strftime('%Y%m%d%H%M%SZ')))
204  rlist = tbs.setComponentByName('responses').getComponentByName('responses')
205  if responses == None:
206    responses = [CreateSingleResponse(CERT, 0)]
207  if responses:
208    for i in range(len(responses)):
209      rlist.setComponentByPosition(i, responses[i])
210
211  if extensions:
212    elist = tbs.setComponentByName('responseExtensions').getComponentByName(
213        'responseExtensions')
214    for i in range(len(extensions)):
215      elist.setComponentByPosition(i, extensions[i])
216
217  sa = rfc2459.AlgorithmIdentifier()
218  sa.setComponentByName('algorithm', SigAlgOid(sigAlg))
219  # TODO(mattm): If pyasn1 gives an error
220  # "Component value is tag-incompatible: Null() vs Any()", try hacking
221  # pyasn1_modules/rfc2459.py's AlgorithmIdentifier to specify univ.Null as the
222  # type for 'parameters'. (Which is an ugly hack, but lets the script work.)
223  sa.setComponentByName('parameters', univ.Null())
224
225  basic = rfc2560.BasicOCSPResponse()
226  basic.setComponentByName('tbsResponseData', tbs)
227  basic.setComponentByName('signatureAlgorithm', sa)
228  if not signature:
229    signature = crypto.sign(signer[2], encoder.encode(tbs), sigAlg)
230  basic.setComponentByName('signature',
231                           univ.BitString("'%s'H" % (signature.encode('hex'))))
232  if certs:
233    cs = basic.setComponentByName('certs').getComponentByName('certs')
234    for i in range(len(certs)):
235      cs.setComponentByPosition(i, certs[i][0])
236
237  rbytes = ocsp.componentType.getTypeByPosition(1)
238  rbytes.setComponentByName('responseType',
239                            univ.ObjectIdentifier(response_type))
240  rbytes.setComponentByName('response', encoder.encode(basic))
241
242  ocsp.setComponentByName('responseBytes', rbytes)
243  return ocsp
244
245
246def MakePemBlock(der, name):
247  b64 = base64.b64encode(der)
248  wrapped = '\n'.join(b64[pos:pos + 64] for pos in xrange(0, len(b64), 64))
249  return '-----BEGIN %s-----\n%s\n-----END %s-----' % (name, wrapped, name)
250
251
252def WriteStringToFile(data, path):
253  with open(path, "w") as f:
254    f.write(data)
255
256
257def ReadFileToString(path):
258  with open(path, 'r') as f:
259    return f.read()
260
261
262def CreateOCSPRequestDer(issuer_cert_pem, cert_pem):
263  '''Uses OpenSSL to generate a basic OCSPRequest for |cert_pem|.'''
264
265  issuer_path = "tmp_issuer.pem"
266  cert_path = "tmp_cert.pem"
267  request_path = "tmp_request.der"
268
269  WriteStringToFile(issuer_cert_pem, issuer_path)
270  WriteStringToFile(cert_pem, cert_path)
271
272  p = subprocess.Popen(["openssl", "ocsp", "-no_nonce", "-issuer", issuer_path,
273                        "-cert", cert_path, "-reqout", request_path],
274                        stdin=subprocess.PIPE,
275                        stdout=subprocess.PIPE,
276                        stderr=subprocess.PIPE)
277  stdout_data, stderr_data = p.communicate()
278
279  os.remove(issuer_path)
280  os.remove(cert_path)
281
282  result = None
283  if p.returncode == 0:
284    result = ReadFileToString(request_path)
285
286  os.remove(request_path)
287  return result
288
289
290def Store(fname, description, ca, data):
291  ca_cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, ca[1])
292  cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, CERT[1])
293
294  ocsp_request_der = CreateOCSPRequestDer(ca_cert_pem, cert_pem)
295
296  out = ('%s\n%s\n%s\n\n%s\n%s') % (
297      description,
298      MakePemBlock(encoder.encode(data), "OCSP RESPONSE"),
299      ca_cert_pem.replace('CERTIFICATE', 'CA CERTIFICATE'),
300      cert_pem,
301      MakePemBlock(ocsp_request_der, "OCSP REQUEST"))
302  open('%s.pem' % fname, 'w').write(out)
303
304
305Store(
306    'no_response',
307    'No SingleResponses attached to the response',
308    CA,
309    Create(responses=[]))
310
311Store(
312    'malformed_request',
313    'Has a status of MALFORMED_REQUEST',
314    CA,
315    Create(response_status=1))
316Store(
317    'bad_status',
318    'Has an invalid status larger than the defined Status enumeration',
319    CA,
320    Create(response_status=17))
321Store(
322    'bad_ocsp_type',
323    'Has an invalid OCSP OID',
324    CA,
325    Create(response_type='1.3.6.1.5.5.7.48.1.2'))
326Store(
327    'bad_signature',
328    'Has an invalid signature',
329    CA,
330    Create(signature='\xde\xad\xbe\xef'))
331Store('ocsp_sign_direct', 'Signed directly by the issuer', CA,
332      Create(signer=CA, certs=[]))
333Store('ocsp_sign_indirect', 'Signed indirectly through an intermediate', CA,
334      Create(signer=CA_LINK, certs=[CA_LINK]))
335Store('ocsp_sign_indirect_missing',
336      'Signed indirectly through a missing intermediate', CA,
337      Create(signer=CA_LINK, certs=[]))
338Store('ocsp_sign_bad_indirect',
339      'Signed through an intermediate without the correct key usage', CA,
340      Create(signer=CA_BADLINK, certs=[CA_BADLINK]))
341Store('ocsp_extra_certs', 'Includes extra certs', CA,
342      Create(signer=CA, certs=[CA, CA_LINK]))
343Store('has_version', 'Includes a default version V1', CA, Create(version=1))
344Store(
345    'responder_name',
346    'Uses byName to identify the signer',
347    CA,
348    Create(responder=GetName(CA)))
349
350# TODO(eroman): pyasn1 module has a bug in rfc2560.ResponderID() that will use
351# IMPLICIT rather than EXPLICIT tagging for byKey
352# (https://github.com/etingof/pyasn1-modules/issues/8). If using an affected
353# version of the library you will need to patch pyasn1_modules/rfc2560.py and
354# replace "implicitTag" with "explicitTag" in ResponderID to generate this
355# test data correctly.
356Store(
357    'responder_id',
358    'Uses byKey to identify the signer',
359    CA,
360    Create(responder=GetKeyHash(CA)))
361Store(
362    'has_extension',
363    'Includes an x509v3 extension',
364    CA,
365    Create(extensions=[EXTENSION]))
366
367Store(
368    'good_response',
369    'Is a valid response for the cert',
370    CA,
371    Create(responses=[CreateSingleResponse(CERT, 0)]))
372Store('good_response_sha256',
373      'Is a valid response for the cert with a SHA256 signature', CA,
374      Create(responses=[CreateSingleResponse(CERT, 0)], sigAlg='sha256'))
375Store(
376    'good_response_next_update',
377    'Is a valid response for the cert until nextUpdate',
378    CA,
379    Create(responses=[CreateSingleResponse(CERT, 0, next=NEXT_DATE)]))
380Store(
381    'revoke_response',
382    'Is a REVOKE response for the cert',
383    CA,
384    Create(responses=[CreateSingleResponse(CERT, 1)]))
385Store(
386    'revoke_response_reason',
387    'Is a REVOKE response for the cert with a reason',
388    CA,
389    Create(responses=[
390        CreateSingleResponse(CERT, 1, revoke_time=REVOKE_DATE, reason=1)
391    ]))
392Store(
393    'unknown_response',
394    'Is an UNKNOWN response for the cert',
395    CA,
396    Create(responses=[CreateSingleResponse(CERT, 2)]))
397Store(
398    'multiple_response',
399    'Has multiple responses for the cert',
400    CA,
401    Create(responses=[
402        CreateSingleResponse(CERT, 0),
403        CreateSingleResponse(CERT, 2)
404    ]))
405Store(
406    'other_response',
407    'Is a response for a different cert',
408    CA,
409    Create(responses=[
410        CreateSingleResponse(JUNK_CERT, 0),
411        CreateSingleResponse(JUNK_CERT, 1)
412    ]))
413Store(
414    'has_single_extension',
415    'Has an extension in the SingleResponse',
416    CA,
417    Create(responses=[
418        CreateSingleResponse(CERT, 0, extensions=[CreateExtension()])
419    ]))
420Store(
421    'has_critical_single_extension',
422    'Has a critical extension in the SingleResponse', CA,
423    Create(responses=[
424        CreateSingleResponse(
425            CERT, 0, extensions=[CreateExtension('1.2.3.4', critical=True)])
426    ]))
427Store(
428    'has_critical_response_extension',
429    'Has a critical extension in the ResponseData', CA,
430    Create(
431        responses=[CreateSingleResponse(CERT, 0)],
432        extensions=[CreateExtension('1.2.3.4', critical=True)]))
433Store(
434    'has_critical_ct_extension',
435    'Has a critical CT extension in the SingleResponse', CA,
436    Create(responses=[
437        CreateSingleResponse(
438            CERT,
439            0,
440            extensions=[
441                CreateExtension('1.3.6.1.4.1.11129.2.4.5', critical=True)
442            ])
443    ]))
444
445Store('missing_response', 'Missing a response for the cert', CA,
446      Create(response_status=0, responses=[]))
447