# Copyright 2014 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Routines to generate root and server certificates. Certificate Naming Conventions: ca_cert: crypto.X509 for the certificate authority (w/ both the pub & priv keys) cert: a crypto.X509 certificate (w/ just the pub key) cert_str: a certificate string (w/ just the pub cert) key: a private crypto.PKey (from ca or pem) ca_cert_str: a certificae authority string (w/ both the pub & priv certs) """ import logging import os import socket import time openssl_import_error = None Error = None SSL_METHOD = None SysCallError = None VERIFY_PEER = None ZeroReturnError = None FILETYPE_PEM = None try: from OpenSSL import crypto, SSL Error = SSL.Error SSL_METHOD = SSL.SSLv23_METHOD SysCallError = SSL.SysCallError VERIFY_PEER = SSL.VERIFY_PEER ZeroReturnError = SSL.ZeroReturnError FILETYPE_PEM = crypto.FILETYPE_PEM except ImportError, e: openssl_import_error = e def get_ssl_context(method=SSL_METHOD): # One of: One of SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, or TLSv1_METHOD if openssl_import_error: raise openssl_import_error # pylint: disable=raising-bad-type return SSL.Context(method) class WrappedConnection(object): def __init__(self, obj): self._wrapped_obj = obj def __getattr__(self, attr): if attr in self.__dict__: return getattr(self, attr) return getattr(self._wrapped_obj, attr) def recv(self, buflen=1024, flags=0): try: return self._wrapped_obj.recv(buflen, flags) except SSL.SysCallError, e: if e.args[1] == 'Unexpected EOF': return '' raise except SSL.ZeroReturnError: return '' def get_ssl_connection(context, connection): return WrappedConnection(SSL.Connection(context, connection)) def load_privatekey(key, filetype=FILETYPE_PEM): """Loads obj private key object from string.""" return crypto.load_privatekey(filetype, key) def load_cert(cert_str, filetype=FILETYPE_PEM): """Loads obj cert object from string.""" return crypto.load_certificate(filetype, cert_str) def _dump_privatekey(key, filetype=FILETYPE_PEM): """Dumps obj private key object to string.""" return crypto.dump_privatekey(filetype, key) def _dump_cert(cert, filetype=FILETYPE_PEM): """Dumps obj cert object to string.""" return crypto.dump_certificate(filetype, cert) def generate_dummy_ca_cert(subject='_WebPageReplayCert'): """Generates dummy certificate authority. Args: subject: a string representing the desired root cert issuer Returns: A tuple of the public key and the private key strings for the root certificate """ if openssl_import_error: raise openssl_import_error # pylint: disable=raising-bad-type key = crypto.PKey() key.generate_key(crypto.TYPE_RSA, 1024) ca_cert = crypto.X509() ca_cert.set_serial_number(int(time.time()*10000)) ca_cert.set_version(2) ca_cert.get_subject().CN = subject ca_cert.get_subject().O = subject ca_cert.gmtime_adj_notBefore(-60 * 60 * 24 * 365 * 2) ca_cert.gmtime_adj_notAfter(60 * 60 * 24 * 365 * 2) ca_cert.set_issuer(ca_cert.get_subject()) ca_cert.set_pubkey(key) ca_cert.add_extensions([ crypto.X509Extension('basicConstraints', True, 'CA:TRUE'), crypto.X509Extension('nsCertType', True, 'sslCA'), crypto.X509Extension('extendedKeyUsage', True, ('serverAuth,clientAuth,emailProtection,' 'timeStamping,msCodeInd,msCodeCom,msCTLSign,' 'msSGC,msEFS,nsSGC')), crypto.X509Extension('keyUsage', False, 'keyCertSign, cRLSign'), crypto.X509Extension('subjectKeyIdentifier', False, 'hash', subject=ca_cert), ]) ca_cert.sign(key, 'sha256') key_str = _dump_privatekey(key) ca_cert_str = _dump_cert(ca_cert) return ca_cert_str, key_str def get_host_cert(host, port=443): """Contacts the host and returns its certificate.""" host_certs = [] def verify_cb(conn, cert, errnum, depth, ok): host_certs.append(cert) # Return True to indicates that the certificate was ok. return True context = SSL.Context(SSL.SSLv23_METHOD) context.set_verify(SSL.VERIFY_PEER, verify_cb) # Demand a certificate s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) connection = SSL.Connection(context, s) try: connection.connect((host, port)) connection.send('') except SSL.SysCallError: pass except socket.gaierror: logging.debug('Host name is not valid') finally: connection.shutdown() connection.close() if not host_certs: logging.warning('Unable to get host certificate from %s:%s', host, port) return '' return _dump_cert(host_certs[-1]) def write_dummy_ca_cert(ca_cert_str, key_str, cert_path): """Writes four certificate files. For example, if cert_path is "mycert.pem": mycert.pem - CA plus private key mycert-cert.pem - CA in PEM format mycert-cert.cer - CA for Android mycert-cert.p12 - CA in PKCS12 format for Windows devices Args: cert_path: path string such as "mycert.pem" ca_cert_str: certificate string key_str: private key string """ dirname = os.path.dirname(cert_path) if dirname and not os.path.exists(dirname): os.makedirs(dirname) root_path = os.path.splitext(cert_path)[0] ca_cert_path = root_path + '-cert.pem' android_cer_path = root_path + '-cert.cer' windows_p12_path = root_path + '-cert.p12' # Dump the CA plus private key with open(cert_path, 'w') as f: f.write(key_str) f.write(ca_cert_str) # Dump the certificate in PEM format with open(ca_cert_path, 'w') as f: f.write(ca_cert_str) # Create a .cer file with the same contents for Android with open(android_cer_path, 'w') as f: f.write(ca_cert_str) ca_cert = load_cert(ca_cert_str) key = load_privatekey(key_str) # Dump the certificate in PKCS12 format for Windows devices with open(windows_p12_path, 'w') as f: p12 = crypto.PKCS12() p12.set_certificate(ca_cert) p12.set_privatekey(key) f.write(p12.export()) def generate_cert(root_ca_cert_str, server_cert_str, server_host): """Generates a cert_str with the sni field in server_cert_str signed by the root_ca_cert_str. Args: root_ca_cert_str: PEM formatted string representing the root cert server_cert_str: PEM formatted string representing cert server_host: host name to use if there is no server_cert_str Returns: a PEM formatted certificate string """ if openssl_import_error: raise openssl_import_error # pylint: disable=raising-bad-type common_name = server_host if server_cert_str: cert = load_cert(server_cert_str) common_name = cert.get_subject().commonName else: cert = crypto.X509() ca_cert = load_cert(root_ca_cert_str) key = load_privatekey(root_ca_cert_str) req = crypto.X509Req() req.get_subject().CN = common_name req.set_pubkey(ca_cert.get_pubkey()) req.sign(key, 'sha256') cert.gmtime_adj_notBefore(-60 * 60) cert.gmtime_adj_notAfter(60 * 60 * 24 * 30) cert.set_issuer(ca_cert.get_subject()) cert.set_subject(req.get_subject()) cert.set_serial_number(int(time.time()*10000)) cert.set_pubkey(req.get_pubkey()) cert.sign(key, 'sha256') return _dump_cert(cert)