#!/usr/local/lib/mailinabox/env/bin/python # Utilities for installing and selecting SSL certificates. import os, os.path, re, shutil, subprocess, tempfile from utils import shell, safe_domain_name, sort_domains import functools import operator # SELECTING SSL CERTIFICATES FOR USE IN WEB def get_ssl_certificates(env): # Scan all of the installed SSL certificates and map every domain # that the certificates are good for to the best certificate for # the domain. from cryptography.hazmat.primitives.asymmetric import dsa, rsa, ec from cryptography.x509 import Certificate # The certificates are all stored here: ssl_root = os.path.join(env["STORAGE_ROOT"], 'ssl') # List all of the files in the SSL directory and one level deep. def get_file_list(): if not os.path.exists(ssl_root): return for fn in os.listdir(ssl_root): if fn == 'ssl_certificate.pem': # This is always a symbolic link # to the certificate to use for # PRIMARY_HOSTNAME. Don't let it # be eligible for use because we # could end up creating a symlink # to itself --- we want to find # the cert that it should be a # symlink to. continue fn = os.path.join(ssl_root, fn) if os.path.isfile(fn): yield fn elif os.path.isdir(fn): for fn1 in os.listdir(fn): fn1 = os.path.join(fn, fn1) if os.path.isfile(fn1): yield fn1 # Remember stuff. private_keys = { } certificates = [ ] # Scan each of the files to find private keys and certificates. # We must load all of the private keys first before processing # certificates so that we can check that we have a private key # available before using a certificate. for fn in get_file_list(): try: pem = load_pem(load_cert_chain(fn)[0]) except ValueError: # Not a valid PEM format for a PEM type we care about. continue # Is it a certificate? if isinstance(pem, Certificate): certificates.append({ "filename": fn, "cert": pem }) # It is a private key elif (isinstance(pem, (rsa.RSAPrivateKey, dsa.DSAPrivateKey, ec.EllipticCurvePrivateKey))): private_keys[pem.public_key().public_numbers()] = { "filename": fn, "key": pem } # Process the certificates. domains = { } for cert in certificates: # What domains is this certificate good for? cert_domains, primary_domain = get_certificate_domains(cert["cert"]) cert["primary_domain"] = primary_domain # Is there a private key file for this certificate? private_key = private_keys.get(cert["cert"].public_key().public_numbers()) if not private_key: continue cert["private_key"] = private_key # Add this cert to the list of certs usable for the domains. for domain in cert_domains: # The primary hostname can only use a certificate mapped # to the system private key. if domain == env['PRIMARY_HOSTNAME'] and cert["private_key"]["filename"] != os.path.join(env['STORAGE_ROOT'], 'ssl', 'ssl_private_key.pem'): continue domains.setdefault(domain, []).append(cert) # Sort the certificates to prefer good ones. import datetime now = datetime.datetime.utcnow() ret = { } for domain, cert_list in domains.items(): #for c in cert_list: print(domain, c.not_valid_before, c.not_valid_after, "("+str(now)+")", c.issuer, c.subject, c._filename) cert_list.sort(key = lambda cert : ( # must be valid NOW cert["cert"].not_valid_before <= now <= cert["cert"].not_valid_after, # prefer one that is not self-signed cert["cert"].issuer != cert["cert"].subject, ########################################################### # The above lines ensure that valid certificates are chosen # over invalid certificates. The lines below choose between # multiple valid certificates available for this domain. ########################################################### # prefer one with the expiration furthest into the future so # that we can easily rotate to new certs as we get them cert["cert"].not_valid_after, ########################################################### # We always choose the certificate that is good for the # longest period of time. This is important for how we # provision certificates for Let's Encrypt. To ensure that # we don't re-provision every night, we have to ensure that # if we choose to provison a certificate that it will # *actually* be used so the provisioning logic knows it # doesn't still need to provision a certificate for the # domain. ########################################################### # in case a certificate is installed in multiple paths, # prefer the... lexicographically last one? cert["filename"], ), reverse=True) cert = cert_list.pop(0) ret[domain] = { "private-key": cert["private_key"]["filename"], "certificate": cert["filename"], "primary-domain": cert["primary_domain"], "certificate_object": cert["cert"], } return ret def get_domain_ssl_files(domain, ssl_certificates, env, allow_missing_cert=False, use_main_cert=True): if use_main_cert or not allow_missing_cert: # Get the system certificate info. ssl_private_key = os.path.join(os.path.join(env["STORAGE_ROOT"], 'ssl', 'ssl_private_key.pem')) ssl_certificate = os.path.join(os.path.join(env["STORAGE_ROOT"], 'ssl', 'ssl_certificate.pem')) system_certificate = { "private-key": ssl_private_key, "certificate": ssl_certificate, "primary-domain": env['PRIMARY_HOSTNAME'], "certificate_object": load_pem(load_cert_chain(ssl_certificate)[0]), } if use_main_cert and domain == env['PRIMARY_HOSTNAME']: # The primary domain must use the server certificate because # it is hard-coded in some service configuration files. return system_certificate wildcard_domain = re.sub(r"^[^\.]+", "*", domain) if domain in ssl_certificates: return ssl_certificates[domain] if wildcard_domain in ssl_certificates: return ssl_certificates[wildcard_domain] if not allow_missing_cert: # No valid certificate is available for this domain! Return default files. return system_certificate # No valid certificate is available for this domain. return None # PROVISIONING CERTIFICATES FROM LETSENCRYPT def get_certificates_to_provision(env, limit_domains=None, show_valid_certs=True): # Get a set of domain names that we can provision certificates for # using certbot. We start with domains that the box is serving web # for and subtract: # * domains not in limit_domains if limit_domains is not empty # * domains with custom "A" records, i.e. they are hosted elsewhere # * domains with actual "A" records that point elsewhere (misconfiguration) # * domains that already have certificates that will be valid for a while from web_update import get_web_domains from status_checks import query_dns, normalize_ip existing_certs = get_ssl_certificates(env) plausible_web_domains = get_web_domains(env, exclude_dns_elsewhere=False) actual_web_domains = get_web_domains(env) domains_to_provision = set() domains_cant_provision = { } for domain in plausible_web_domains: # Skip domains that the user doesn't want to provision now. if limit_domains and domain not in limit_domains: continue # Check that there isn't an explicit A/AAAA record. if domain not in actual_web_domains: domains_cant_provision[domain] = "The domain has a custom DNS A/AAAA record that points the domain elsewhere, so there is no point to installing a TLS certificate here and we could not automatically provision one anyway because provisioning requires access to the website (which isn't here)." # Check that the DNS resolves to here. else: # Does the domain resolve to this machine in public DNS? If not, # we can't do domain control validation. For IPv6 is configured, # make sure both IPv4 and IPv6 are correct because we don't know # how Let's Encrypt will connect. bad_dns = [] for rtype, value in [("A", env["PUBLIC_IP"]), ("AAAA", env.get("PUBLIC_IPV6"))]: if not value: continue # IPv6 is not configured response = query_dns(domain, rtype) if response != normalize_ip(value): bad_dns.append(f"{response} ({rtype})") if bad_dns: domains_cant_provision[domain] = "The domain name does not resolve to this machine: " \ + (", ".join(bad_dns)) \ + "." else: # DNS is all good. # Check for a good existing cert. existing_cert = get_domain_ssl_files(domain, existing_certs, env, use_main_cert=False, allow_missing_cert=True) if existing_cert: existing_cert_check = check_certificate(domain, existing_cert['certificate'], existing_cert['private-key'], warn_if_expiring_soon=14) if existing_cert_check[0] == "OK": if show_valid_certs: domains_cant_provision[domain] = "The domain has a valid certificate already. ({} Certificate: {}, private key {})".format( existing_cert_check[1], existing_cert['certificate'], existing_cert['private-key']) continue domains_to_provision.add(domain) return (domains_to_provision, domains_cant_provision) def provision_certificates(env, limit_domains): # What domains should we provision certificates for? And what # errors prevent provisioning for other domains. domains, domains_cant_provision = get_certificates_to_provision(env, limit_domains=limit_domains) # Build a list of what happened on each domain or domain-set. ret = [] for domain, error in domains_cant_provision.items(): ret.append({ "domains": [domain], "log": [error], "result": "skipped", }) # Break into groups by DNS zone: Group every domain with its parent domain, if # its parent domain is in the list of domains to request a certificate for. # Start with the zones so that if the zone doesn't need a certificate itself, # its children will still be grouped together. Sort the provision domains to # put parents ahead of children. # Since Let's Encrypt requests are limited to 100 domains at a time, # we'll create a list of lists of domains where the inner lists have # at most 100 items. By sorting we also get the DNS zone domain as the first # entry in each list (unless we overflow beyond 100) which ends up as the # primary domain listed in each certificate. from dns_update import get_dns_zones certs = { } for zone, _zonefile in get_dns_zones(env): certs[zone] = [[]] for domain in sort_domains(domains, env): # Does the domain end with any domain we've seen so far. for parent in certs: if domain.endswith("." + parent): # Add this to the parent's list of domains. # Start a new group if the list already has # 100 items. if len(certs[parent][-1]) == 100: certs[parent].append([]) certs[parent][-1].append(domain) break else: # This domain is not a child of any domain we've seen yet, so # start a new group. This shouldn't happen since every zone # was already added. certs[domain] = [[domain]] # Flatten to a list of lists of domains (from a mapping). Remove empty # lists (zones with no domains that need certs). certs = functools.reduce(operator.iadd, certs.values(), []) certs = [_ for _ in certs if len(_) > 0] # Prepare to provision. # Where should we put our Let's Encrypt account info and state cache. account_path = os.path.join(env['STORAGE_ROOT'], 'ssl/lets_encrypt') if not os.path.exists(account_path): os.mkdir(account_path) # Provision certificates. for domain_list in certs: ret.append({ "domains": domain_list, "log": [], }) try: # Create a CSR file for our master private key so that certbot # uses our private key. key_file = os.path.join(env['STORAGE_ROOT'], 'ssl', 'ssl_private_key.pem') with tempfile.NamedTemporaryFile() as csr_file: # We could use openssl, but certbot requires # that the CN domain and SAN domains match # the domain list passed to certbot, and adding # SAN domains openssl req is ridiculously complicated. # subprocess.check_output([ # "openssl", "req", "-new", # "-key", key_file, # "-out", csr_file.name, # "-subj", "/CN=" + domain_list[0], # "-sha256" ]) from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.serialization import Encoding from cryptography.hazmat.primitives import hashes from cryptography.x509.oid import NameOID builder = x509.CertificateSigningRequestBuilder() builder = builder.subject_name(x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, domain_list[0]) ])) builder = builder.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True) builder = builder.add_extension(x509.SubjectAlternativeName( [x509.DNSName(d) for d in domain_list] ), critical=False) request = builder.sign(load_pem(load_cert_chain(key_file)[0]), hashes.SHA256(), default_backend()) with open(csr_file.name, "wb") as f: f.write(request.public_bytes(Encoding.PEM)) # Provision, writing to a temporary file. webroot = os.path.join(account_path, 'webroot') os.makedirs(webroot, exist_ok=True) with tempfile.TemporaryDirectory() as d: cert_file = os.path.join(d, 'cert_and_chain.pem') print("Provisioning TLS certificates for " + ", ".join(domain_list) + ".") certbotret = subprocess.check_output([ "certbot", "certonly", #"-v", # just enough to see ACME errors "--non-interactive", # will fail if user hasn't registered during Mail-in-a-Box setup "-d", ",".join(domain_list), # first will be main domain "--csr", csr_file.name, # use our private key; unfortunately this doesn't work with auto-renew so we need to save cert manually "--cert-path", os.path.join(d, 'cert'), # we only use the full chain "--chain-path", os.path.join(d, 'chain'), # we only use the full chain "--fullchain-path", cert_file, "--webroot", "--webroot-path", webroot, "--config-dir", account_path, #"--staging", ], stderr=subprocess.STDOUT).decode("utf8") install_cert_copy_file(cert_file, env) ret[-1]["log"].append(certbotret) ret[-1]["result"] = "installed" except subprocess.CalledProcessError as e: ret[-1]["log"].append(e.output.decode("utf8")) ret[-1]["result"] = "error" except Exception as e: ret[-1]["log"].append(str(e)) ret[-1]["result"] = "error" # Run post-install steps. ret.extend(post_install_func(env)) # Return what happened with each certificate request. return ret def provision_certificates_cmdline(): import sys from exclusiveprocess import Lock from utils import load_environment Lock(die=True).forever() env = load_environment() quiet = False domains = [] for arg in sys.argv[1:]: if arg == "-q": quiet = True else: domains.append(arg) # Go. status = provision_certificates(env, limit_domains=domains) # Show what happened. for request in status: if isinstance(request, str): print(request) else: if quiet and request['result'] == 'skipped': continue print(request['result'] + ":", ", ".join(request['domains']) + ":") for line in request["log"]: print(line) print() # INSTALLING A NEW CERTIFICATE FROM THE CONTROL PANEL def create_csr(domain, ssl_key, country_code, env): return shell("check_output", [ "openssl", "req", "-new", "-key", ssl_key, "-sha256", "-subj", f"/C={country_code}/CN={domain}"]) def install_cert(domain, ssl_cert, ssl_chain, env, raw=False): # Write the combined cert+chain to a temporary path and validate that it is OK. # The certificate always goes above the chain. import tempfile fd, fn = tempfile.mkstemp('.pem') os.write(fd, (ssl_cert + '\n' + ssl_chain).encode("ascii")) os.close(fd) # Do validation on the certificate before installing it. ssl_private_key = os.path.join(os.path.join(env["STORAGE_ROOT"], 'ssl', 'ssl_private_key.pem')) cert_status, cert_status_details = check_certificate(domain, fn, ssl_private_key) if cert_status != "OK": if cert_status == "SELF-SIGNED": cert_status = "This is a self-signed certificate. I can't install that." os.unlink(fn) if cert_status_details is not None: cert_status += " " + cert_status_details return cert_status # Copy certificate into ssl directory. install_cert_copy_file(fn, env) # Run post-install steps. ret = post_install_func(env) if raw: return ret return "\n".join(ret) def install_cert_copy_file(fn, env): # Where to put it? # Make a unique path for the certificate. from cryptography.hazmat.primitives import hashes from binascii import hexlify cert = load_pem(load_cert_chain(fn)[0]) _all_domains, cn = get_certificate_domains(cert) path = "{}-{}-{}.pem".format( safe_domain_name(cn), # common name, which should be filename safe because it is IDNA-encoded, but in case of a malformed cert make sure it's ok to use as a filename cert.not_valid_after.date().isoformat().replace("-", ""), # expiration date hexlify(cert.fingerprint(hashes.SHA256())).decode("ascii")[0:8], # fingerprint prefix ) ssl_certificate = os.path.join(os.path.join(env["STORAGE_ROOT"], 'ssl', path)) # Install the certificate. os.makedirs(os.path.dirname(ssl_certificate), exist_ok=True) shutil.move(fn, ssl_certificate) def post_install_func(env): ret = [] # Get the certificate to use for PRIMARY_HOSTNAME. ssl_certificates = get_ssl_certificates(env) cert = get_domain_ssl_files(env['PRIMARY_HOSTNAME'], ssl_certificates, env, use_main_cert=False) if not cert: # Ruh-row, we don't have any certificate usable # for the primary hostname. ret.append("there is no valid certificate for " + env['PRIMARY_HOSTNAME']) # Symlink the best cert for PRIMARY_HOSTNAME to the system # certificate path, which is hard-coded for various purposes, and then # restart postfix and dovecot. system_ssl_certificate = os.path.join(os.path.join(env["STORAGE_ROOT"], 'ssl', 'ssl_certificate.pem')) if cert and os.readlink(system_ssl_certificate) != cert['certificate']: # Update symlink. ret.append("updating primary certificate") ssl_certificate = cert['certificate'] os.unlink(system_ssl_certificate) os.symlink(ssl_certificate, system_ssl_certificate) # Restart postfix and dovecot so they pick up the new file. shell('check_call', ["/usr/sbin/service", "postfix", "restart"]) shell('check_call', ["/usr/sbin/service", "dovecot", "restart"]) ret.append("mail services restarted") # The DANE TLSA record will remain valid so long as the private key # hasn't changed. We don't ever change the private key automatically. # If the user does it, they must manually update DNS. # Update the web configuration so nginx picks up the new certificate file. from web_update import do_web_update ret.append( do_web_update(env) ) return ret # VALIDATION OF CERTIFICATES def check_certificate(domain, ssl_certificate, ssl_private_key, warn_if_expiring_soon=10, rounded_time=False, just_check_domain=False): # Check that the ssl_certificate & ssl_private_key files are good # for the provided domain. from cryptography.hazmat.primitives.asymmetric import rsa, dsa, ec from cryptography.x509 import Certificate # The ssl_certificate file may contain a chain of certificates. We'll # need to split that up before we can pass anything to openssl or # parse them in Python. Parse it with the cryptography library. try: ssl_cert_chain = load_cert_chain(ssl_certificate) cert = load_pem(ssl_cert_chain[0]) if not isinstance(cert, Certificate): msg = "This is not a certificate file." raise ValueError(msg) except ValueError as e: return (f"There is a problem with the certificate file: {e!s}", None) # First check that the domain name is one of the names allowed by # the certificate. if domain is not None: certificate_names, _cert_primary_name = get_certificate_domains(cert) # Check that the domain appears among the acceptable names, or a wildcard # form of the domain name (which is a stricter check than the specs but # should work in normal cases). wildcard_domain = re.sub(r"^[^\.]+", "*", domain) if domain not in certificate_names and wildcard_domain not in certificate_names: return ("The certificate is for the wrong domain name. It is for {}.".format(", ".join(sorted(certificate_names))), None) # Second, check that the certificate matches the private key. if ssl_private_key is not None: try: with open(ssl_private_key, 'rb') as f: priv_key = load_pem(f.read()) except ValueError as e: return (f"The private key file {ssl_private_key} is not a private key file: {e!s}", None) if (not isinstance(priv_key, rsa.RSAPrivateKey) and not isinstance(priv_key, dsa.DSAPrivateKey) and not isinstance(priv_key, ec.EllipticCurvePrivateKey)): return (f"The private key file {ssl_private_key} is not a private key file.", None) if priv_key.public_key().public_numbers() != cert.public_key().public_numbers(): return (f"The certificate does not correspond to the private key at {ssl_private_key}.", None) # We could also use the openssl command line tool to get the modulus # listed in each file. The output of each command below looks like "Modulus=XXXXX". # $ openssl rsa -inform PEM -noout -modulus -in ssl_private_key # $ openssl x509 -in ssl_certificate -noout -modulus # Third, check if the certificate is self-signed. Return a special flag string. if cert.issuer == cert.subject: return ("SELF-SIGNED", None) # When selecting which certificate to use for non-primary domains, we check if the primary # certificate or a www-parent-domain certificate is good for the domain. There's no need # to run extra checks beyond this point. if just_check_domain: return ("OK", None) # Check that the certificate hasn't expired. The datetimes returned by the # certificate are 'naive' and in UTC. We need to get the current time in UTC. import datetime now = datetime.datetime.utcnow() if not(cert.not_valid_before <= now <= cert.not_valid_after): return (f"The certificate has expired or is not yet valid. It is valid from {cert.not_valid_before} to {cert.not_valid_after}.", None) # Next validate that the certificate is valid. This checks whether the certificate # is self-signed, that the chain of trust makes sense, that it is signed by a CA # that Ubuntu has installed on this machine's list of CAs, and I think that it hasn't # expired. # The certificate chain has to be passed separately and is given via STDIN. # This command returns a non-zero exit status in most cases, so trap errors. retcode, verifyoutput = shell('check_output', [ "openssl", "verify", "-verbose", "-purpose", "sslserver", "-policy_check",] + ([] if len(ssl_cert_chain) == 1 else ["-untrusted", "/proc/self/fd/0"]) + [ssl_certificate], input=b"\n\n".join(ssl_cert_chain[1:]), trap=True) if "self signed" in verifyoutput: # Certificate is self-signed. Probably we detected this above. return ("SELF-SIGNED", None) if retcode != 0: if "unable to get local issuer certificate" in verifyoutput: return (f"The certificate is missing an intermediate chain or the intermediate chain is incorrect or incomplete. ({verifyoutput})", None) # There is some unknown problem. Return the `openssl verify` raw output. return ("There is a problem with the certificate.", verifyoutput.strip()) # `openssl verify` returned a zero exit status so the cert is currently # good. # But is it expiring soon? cert_expiration_date = cert.not_valid_after ndays = (cert_expiration_date-now).days if not rounded_time or ndays <= 10: # Yikes better renew soon! expiry_info = f"The certificate expires in {ndays:d} days on {cert_expiration_date.date().isoformat()}." else: # We'll renew it with Lets Encrypt. expiry_info = f"The certificate expires on {cert_expiration_date.date().isoformat()}." if warn_if_expiring_soon and ndays <= warn_if_expiring_soon: # Warn on day 10 to give 4 days for us to automatically renew the # certificate, which occurs on day 14. return ("The certificate is expiring soon: " + expiry_info, None) # Return the special OK code. return ("OK", expiry_info) def load_cert_chain(pemfile): # A certificate .pem file may contain a chain of certificates. # Load the file and split them apart. re_pem = rb"(-+BEGIN (?:.+)-+[\r\n]+(?:[A-Za-z0-9+/=]{1,64}[\r\n]+)+-+END (?:.+)-+[\r\n]+)" with open(pemfile, "rb") as f: pem = f.read() + b"\n" # ensure trailing newline pemblocks = re.findall(re_pem, pem) if len(pemblocks) == 0: msg = "File does not contain valid PEM data." raise ValueError(msg) return pemblocks def load_pem(pem): # Parse a "---BEGIN .... END---" PEM string and return a Python object for it # using classes from the cryptography package. from cryptography.x509 import load_pem_x509_certificate from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend pem_type = re.match(br"-+BEGIN (.*?)-+[\r\n]", pem) if pem_type is None: msg = "File is not a valid PEM-formatted file." raise ValueError(msg) pem_type = pem_type.group(1) if pem_type.endswith(b"PRIVATE KEY"): return serialization.load_pem_private_key(pem, password=None, backend=default_backend()) if pem_type == b"CERTIFICATE": return load_pem_x509_certificate(pem, default_backend()) raise ValueError("Unsupported PEM object type: " + pem_type.decode("ascii", "replace")) def get_certificate_domains(cert): from cryptography.x509 import DNSName, ExtensionNotFound, OID_COMMON_NAME, OID_SUBJECT_ALTERNATIVE_NAME import idna names = set() cn = None # The domain may be found in the Subject Common Name (CN). This comes back as an IDNA (ASCII) # string, which is the format we store domains in - so good. try: cn = cert.subject.get_attributes_for_oid(OID_COMMON_NAME)[0].value names.add(cn) except IndexError: # No common name? Certificate is probably generated incorrectly. # But we'll let it error-out when it doesn't find the domain. pass # ... or be one of the Subject Alternative Names. The cryptography library handily IDNA-decodes # the names for us. We must encode back to ASCII, but wildcard certificates can't pass through # IDNA encoding/decoding so we must special-case. See https://github.com/pyca/cryptography/pull/2071. def idna_decode_dns_name(dns_name): if dns_name.startswith("*."): return "*." + idna.encode(dns_name[2:]).decode('ascii') return idna.encode(dns_name).decode('ascii') try: sans = cert.extensions.get_extension_for_oid(OID_SUBJECT_ALTERNATIVE_NAME).value.get_values_for_type(DNSName) names.update(idna_decode_dns_name(san) for san in sans) except ExtensionNotFound: pass return names, cn if __name__ == "__main__": # Provision certificates. provision_certificates_cmdline()