1#!/usr/bin/env python 2# 3# This file is part of pyasn1-modules software. 4# 5# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com> 6# License: http://snmplabs.com/pyasn1/license.html 7# 8import hashlib 9import sys 10 11try: 12 import urllib2 13 14except ImportError: 15 import urllib.request as urllib2 16 17from pyasn1.codec.der import decoder 18from pyasn1.codec.der import encoder 19from pyasn1.type import univ 20 21from pyasn1_modules import rfc2560 22from pyasn1_modules import rfc2459 23from pyasn1_modules import pem 24 25sha1oid = univ.ObjectIdentifier((1, 3, 14, 3, 2, 26)) 26 27 28# noinspection PyClassHasNoInit 29class ValueOnlyBitStringEncoder(encoder.encoder.BitStringEncoder): 30 # These methods just do not encode tag and length fields of TLV 31 def encodeTag(self, *args): 32 return '' 33 34 def encodeLength(self, *args): 35 return '' 36 37 def encodeValue(*args): 38 substrate, isConstructed = encoder.encoder.BitStringEncoder.encodeValue(*args) 39 # OCSP-specific hack follows: cut off the "unused bit count" 40 # encoded bit-string value. 41 return substrate[1:], isConstructed 42 43 def __call__(self, bitStringValue): 44 return self.encode(None, bitStringValue, defMode=True, maxChunkSize=0) 45 46 47valueOnlyBitStringEncoder = ValueOnlyBitStringEncoder() 48 49 50# noinspection PyShadowingNames 51def mkOcspRequest(issuerCert, userCert): 52 issuerTbsCertificate = issuerCert.getComponentByName('tbsCertificate') 53 issuerSubject = issuerTbsCertificate.getComponentByName('subject') 54 55 userTbsCertificate = userCert.getComponentByName('tbsCertificate') 56 userIssuer = userTbsCertificate.getComponentByName('issuer') 57 58 assert issuerSubject == userIssuer, '%s\n%s' % ( 59 issuerSubject.prettyPrint(), userIssuer.prettyPrint() 60 ) 61 62 userIssuerHash = hashlib.sha1( 63 encoder.encode(userIssuer) 64 ).digest() 65 66 issuerSubjectPublicKey = issuerTbsCertificate.getComponentByName('subjectPublicKeyInfo').getComponentByName( 67 'subjectPublicKey') 68 69 issuerKeyHash = hashlib.sha1( 70 valueOnlyBitStringEncoder(issuerSubjectPublicKey) 71 ).digest() 72 73 userSerialNumber = userTbsCertificate.getComponentByName('serialNumber') 74 75 # Build request object 76 77 request = rfc2560.Request() 78 79 reqCert = request.setComponentByName('reqCert').getComponentByName('reqCert') 80 81 hashAlgorithm = reqCert.setComponentByName('hashAlgorithm').getComponentByName('hashAlgorithm') 82 hashAlgorithm.setComponentByName('algorithm', sha1oid) 83 84 reqCert.setComponentByName('issuerNameHash', userIssuerHash) 85 reqCert.setComponentByName('issuerKeyHash', issuerKeyHash) 86 reqCert.setComponentByName('serialNumber', userSerialNumber) 87 88 ocspRequest = rfc2560.OCSPRequest() 89 90 tbsRequest = ocspRequest.setComponentByName('tbsRequest').getComponentByName('tbsRequest') 91 tbsRequest.setComponentByName('version', 'v1') 92 93 requestList = tbsRequest.setComponentByName('requestList').getComponentByName('requestList') 94 requestList.setComponentByPosition(0, request) 95 96 return ocspRequest 97 98 99def parseOcspResponse(ocspResponse): 100 responseStatus = ocspResponse.getComponentByName('responseStatus') 101 assert responseStatus == rfc2560.OCSPResponseStatus('successful'), responseStatus.prettyPrint() 102 responseBytes = ocspResponse.getComponentByName('responseBytes') 103 responseType = responseBytes.getComponentByName('responseType') 104 assert responseType == rfc2560.id_pkix_ocsp_basic, responseType.prettyPrint() 105 106 response = responseBytes.getComponentByName('response') 107 108 basicOCSPResponse, _ = decoder.decode( 109 response, asn1Spec=rfc2560.BasicOCSPResponse() 110 ) 111 112 tbsResponseData = basicOCSPResponse.getComponentByName('tbsResponseData') 113 114 response0 = tbsResponseData.getComponentByName('responses').getComponentByPosition(0) 115 116 return ( 117 tbsResponseData.getComponentByName('producedAt'), 118 response0.getComponentByName('certID'), 119 response0.getComponentByName('certStatus').getName(), 120 response0.getComponentByName('thisUpdate') 121 ) 122 123 124if len(sys.argv) != 2: 125 print("""Usage: 126$ cat CACertificate.pem userCertificate.pem | %s <ocsp-responder-url>""" % sys.argv[0]) 127 sys.exit(-1) 128else: 129 ocspUrl = sys.argv[1] 130 131# Parse CA and user certificates 132 133issuerCert, _ = decoder.decode( 134 pem.readPemBlocksFromFile( 135 sys.stdin, ('-----BEGIN CERTIFICATE-----', '-----END CERTIFICATE-----') 136 )[1], 137 asn1Spec=rfc2459.Certificate() 138) 139# noinspection PyRedeclaration 140userCert, _ = decoder.decode( 141 pem.readPemBlocksFromFile( 142 sys.stdin, ('-----BEGIN CERTIFICATE-----', '-----END CERTIFICATE-----') 143 )[1], 144 asn1Spec=rfc2459.Certificate() 145) 146 147# Build OCSP request 148 149ocspReq = mkOcspRequest(issuerCert, userCert) 150 151# Use HTTP POST to get response (see Appendix A of RFC 2560) 152# In case you need proxies, set the http_proxy env variable 153 154httpReq = urllib2.Request( 155 ocspUrl, 156 encoder.encode(ocspReq), 157 {'Content-Type': 'application/ocsp-request'} 158) 159httpRsp = urllib2.urlopen(httpReq).read() 160 161# Process OCSP response 162 163# noinspection PyRedeclaration 164ocspRsp, _ = decoder.decode(httpRsp, asn1Spec=rfc2560.OCSPResponse()) 165 166producedAt, certId, certStatus, thisUpdate = parseOcspResponse(ocspRsp) 167 168print('Certificate ID %s is %s at %s till %s\n' % (certId.getComponentByName('serialNumber'), 169 certStatus, producedAt, thisUpdate)) 170