Source code for iceprod.server.ssl_cert

"""
Functions relating to OpenSSL certificates.
"""

import os
import hashlib
import logging
from datetime import datetime
import uuid

from OpenSSL import SSL,crypto
from pyasn1.type import univ
from pyasn1.codec.der import encoder

from iceprod.core import functions

logger = logging.getLogger('ssl_cert')


[docs] def create_ca(cert_filename,key_filename,days=365,hostname=None): """Make a certificate authority and key pair""" cert_filename = os.path.abspath(os.path.expandvars(cert_filename)) key_filename = os.path.abspath(os.path.expandvars(key_filename)) logger.warning('making CA cert at %s',cert_filename) if not (os.path.exists(cert_filename) and os.path.exists(key_filename)): if hostname is None: # get hostname hostname = functions.gethostname().encode('utf-8') if hostname is None: raise Exception('Cannot get hostname') # create a key pair k = crypto.PKey() k.generate_key(crypto.TYPE_RSA, 4096) # create a self-signed cert cert = crypto.X509() cert.set_version(2) # version 3, since count starts at 0 cert.get_subject().C = "US" cert.get_subject().ST = "Wisconsin" cert.get_subject().L = "Madison" cert.get_subject().O = "University of Wisconsin-Madison" # noqa: E741 cert.get_subject().OU = "IceCube IceProd Root CA" cert.get_subject().CN = hostname cert.set_serial_number(1) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(days*24*60*60) cert.set_issuer(cert.get_subject()) cert.set_pubkey(k) # get integer public key for cert # pyOpenSSL doesn't provide a good access method, # so do it the hard way by extracting from key output pubkey = crypto.dump_privatekey(crypto.FILETYPE_TEXT, cert.get_pubkey()) logger.info('%r',pubkey) pubkey = [x.strip() for x in pubkey.split(b'\n') if len(x) > 0 and x.startswith(b' ')] logger.info('%r',pubkey) pubkey = int(b''.join(pubkey).replace(b':',b''),16) # make asn1 DER encoding seq = univ.Sequence() seq.setComponentByPosition(0,univ.Integer(pubkey)) seq.setComponentByPosition(1,univ.Integer(65537)) enc = encoder.encode(seq) # get hash of DER hash = hashlib.sha1(enc).hexdigest().encode('utf-8') # add extensions cert.add_extensions([ crypto.X509Extension(b"basicConstraints", True, b"CA:TRUE, pathlen:1"), crypto.X509Extension(b"keyUsage", True, b"keyCertSign, cRLSign"), crypto.X509Extension(b"subjectKeyIdentifier", False, hash, subject=cert), crypto.X509Extension(b"subjectAltName", False, b'DNS:'+hostname.encode('utf-8')) ]) cert.sign(k, 'sha512') open(cert_filename, "wb").write( crypto.dump_certificate(crypto.FILETYPE_PEM, cert)) open(key_filename, "wb").write( crypto.dump_privatekey(crypto.FILETYPE_PEM, k))
[docs] def create_cert(cert_filename,key_filename,days=365,hostname=None, cacert=None,cakey=None,allow_resign=False): """Make a certificate and key pair""" cert_filename = os.path.abspath(os.path.expandvars(cert_filename)) key_filename = os.path.abspath(os.path.expandvars(key_filename)) logger.warning('making cert at %s',cert_filename) if cacert: cacert = os.path.abspath(os.path.expandvars(cacert)) cakey = os.path.abspath(os.path.expandvars(cakey)) logger.warning('with CA %s',cacert) if not (os.path.exists(cacert) and os.path.exists(cakey)): raise Exception('CA cert does not exist') if not (os.path.exists(cert_filename) and os.path.exists(key_filename)): if hostname is None: # get hostname hostname = functions.gethostname().encode('utf-8') if hostname is None: raise Exception('Cannot get hostname') # create a key pair k = crypto.PKey() k.generate_key(crypto.TYPE_RSA, 2048) if cacert is None or cakey is None: # self-signed cert = crypto.X509() else: # certificate request cert = crypto.X509Req() if cacert is None or cakey is None: # self-sign cert.get_subject().CN = hostname cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(days*24*60*60) cert.set_serial_number(uuid.uuid4().int) cert.set_issuer(cert.get_subject()) cert.set_pubkey(k) # add extensions exts = [crypto.X509Extension(b'subjectAltName', False, b'DNS:'+hostname.encode('utf-8'))] if allow_resign: exts.extend([ crypto.X509Extension("basicConstraints", True, "CA:TRUE, pathlen:0"), # crypto.X509Extension("keyUsage", True, # "keyCertSign, cRLSign"), # crypto.X509Extension("subjectKeyIdentifier", False, hash, # subject=cert), ]) cert.add_extensions(exts) cert.sign(k, 'sha512') else: cert.get_subject().C = "US" cert.get_subject().ST = "Wisconsin" cert.get_subject().L = "Madison" cert.get_subject().O = "University of Wisconsin-Madison" # noqa: E741 cert.get_subject().OU = "IceCube IceProd" cert.get_subject().CN = hostname # finish cert req cert.set_pubkey(k) cert.sign(k, 'sha512') # load CA cacert = os.path.abspath(os.path.expandvars(cacert)) cakey = os.path.abspath(os.path.expandvars(cakey)) ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM,open(cacert).read()) ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM,open(cakey).read()) # make actual cert and sign with CA cert2 = crypto.X509() cert2.set_subject(cert.get_subject()) cert2.set_serial_number(1) cert2.gmtime_adj_notBefore(0) cert2.gmtime_adj_notAfter(days*24*60*60) cert2.set_issuer(ca_cert.get_subject()) cert2.set_pubkey(cert.get_pubkey()) # add extensions exts = [crypto.X509Extension(b"subjectAltName", False, b'DNS:'+hostname.encode('utf-8'))] if allow_resign: exts.extend([ crypto.X509Extension(b"basicConstraints", True, b"CA:TRUE, pathlen:0"), # crypto.X509Extension("keyUsage", True, # "keyCertSign, cRLSign"), # crypto.X509Extension("subjectKeyIdentifier", False, hash, # subject=cert), ]) cert2.add_extensions(exts) cert2.sign(ca_key, 'sha512') # overwrite cert req with real cert cert = cert2 open(cert_filename, "wb").write( crypto.dump_certificate(crypto.FILETYPE_PEM, cert)) open(key_filename, "wb").write( crypto.dump_privatekey(crypto.FILETYPE_PEM, k))
[docs] def verify_cert(cert_filename, key_filename): """Verify if cert and key match. Return False for failure, True for success. """ cert_filename = os.path.abspath(os.path.expandvars(cert_filename)) key_filename = os.path.abspath(os.path.expandvars(key_filename)) cert = crypto.load_certificate(crypto.FILETYPE_PEM,open(cert_filename).read()) key = crypto.load_privatekey(crypto.FILETYPE_PEM,open(key_filename).read()) # check date begin = cert.get_notBefore().decode('utf-8') logger.debug('begin: %r',begin) if datetime.strptime(begin, "%Y%m%d%H%M%SZ") > datetime.utcnow(): logger.error('cert only valid in future') return False end = cert.get_notAfter().decode('utf-8') logger.debug('end: %r',end) if datetime.strptime(end, "%Y%m%d%H%M%SZ") < datetime.utcnow(): logger.warning('cert has expired') return False # check matching cert and key ctx = SSL.Context(SSL.TLSv1_METHOD) ctx.use_privatekey(key) ctx.use_certificate(cert) try: ctx.check_privatekey() except SSL.Error: logger.warning('cert and key do not match') return False else: return True