|
|
|
|
@@ -1,7 +1,7 @@
|
|
|
|
|
#!/usr/local/lib/mailinabox/env/bin/python
|
|
|
|
|
# Utilities for installing and selecting SSL certificates.
|
|
|
|
|
|
|
|
|
|
import os, os.path, re, shutil
|
|
|
|
|
import os, os.path, re, shutil, subprocess, tempfile
|
|
|
|
|
|
|
|
|
|
from utils import shell, safe_domain_name, sort_domains
|
|
|
|
|
import idna
|
|
|
|
|
@@ -24,6 +24,16 @@ def get_ssl_certificates(env):
|
|
|
|
|
if not os.path.exists(ssl_root):
|
|
|
|
|
return
|
|
|
|
|
for fn in os.listdir(ssl_root):
|
|
|
|
|
if fn == 'ssl_certificate.pem':
|
|
|
|
|
# This is always a symbolic link
|
|
|
|
|
# to the certificate to use for
|
|
|
|
|
# PRIMARY_HOSTNAME. Don't let it
|
|
|
|
|
# be eligible for use because we
|
|
|
|
|
# could end up creating a symlink
|
|
|
|
|
# to itself --- we want to find
|
|
|
|
|
# the cert that it should be a
|
|
|
|
|
# symlink to.
|
|
|
|
|
continue
|
|
|
|
|
fn = os.path.join(ssl_root, fn)
|
|
|
|
|
if os.path.isfile(fn):
|
|
|
|
|
yield fn
|
|
|
|
|
@@ -74,6 +84,12 @@ def get_ssl_certificates(env):
|
|
|
|
|
|
|
|
|
|
# Add this cert to the list of certs usable for the domains.
|
|
|
|
|
for domain in cert_domains:
|
|
|
|
|
# The primary hostname can only use a certificate mapped
|
|
|
|
|
# to the system private key.
|
|
|
|
|
if domain == env['PRIMARY_HOSTNAME']:
|
|
|
|
|
if cert._private_key._filename != os.path.join(env['STORAGE_ROOT'], 'ssl', 'ssl_private_key.pem'):
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
domains.setdefault(domain, []).append(cert)
|
|
|
|
|
|
|
|
|
|
# Sort the certificates to prefer good ones.
|
|
|
|
|
@@ -81,6 +97,7 @@ def get_ssl_certificates(env):
|
|
|
|
|
now = datetime.datetime.utcnow()
|
|
|
|
|
ret = { }
|
|
|
|
|
for domain, cert_list in domains.items():
|
|
|
|
|
#for c in cert_list: print(domain, c.not_valid_before, c.not_valid_after, "("+str(now)+")", c.issuer, c.subject, c._filename)
|
|
|
|
|
cert_list.sort(key = lambda cert : (
|
|
|
|
|
# must be valid NOW
|
|
|
|
|
cert.not_valid_before <= now <= cert.not_valid_after,
|
|
|
|
|
@@ -124,21 +141,22 @@ def get_ssl_certificates(env):
|
|
|
|
|
|
|
|
|
|
return ret
|
|
|
|
|
|
|
|
|
|
def get_domain_ssl_files(domain, ssl_certificates, env, allow_missing_cert=False, raw=False):
|
|
|
|
|
# Get the system certificate info.
|
|
|
|
|
ssl_private_key = os.path.join(os.path.join(env["STORAGE_ROOT"], 'ssl', 'ssl_private_key.pem'))
|
|
|
|
|
ssl_certificate = os.path.join(os.path.join(env["STORAGE_ROOT"], 'ssl', 'ssl_certificate.pem'))
|
|
|
|
|
system_certificate = {
|
|
|
|
|
"private-key": ssl_private_key,
|
|
|
|
|
"certificate": ssl_certificate,
|
|
|
|
|
"primary-domain": env['PRIMARY_HOSTNAME'],
|
|
|
|
|
"certificate_object": load_pem(load_cert_chain(ssl_certificate)[0]),
|
|
|
|
|
}
|
|
|
|
|
def get_domain_ssl_files(domain, ssl_certificates, env, allow_missing_cert=False, use_main_cert=True):
|
|
|
|
|
if use_main_cert:
|
|
|
|
|
# Get the system certificate info.
|
|
|
|
|
ssl_private_key = os.path.join(os.path.join(env["STORAGE_ROOT"], 'ssl', 'ssl_private_key.pem'))
|
|
|
|
|
ssl_certificate = os.path.join(os.path.join(env["STORAGE_ROOT"], 'ssl', 'ssl_certificate.pem'))
|
|
|
|
|
system_certificate = {
|
|
|
|
|
"private-key": ssl_private_key,
|
|
|
|
|
"certificate": ssl_certificate,
|
|
|
|
|
"primary-domain": env['PRIMARY_HOSTNAME'],
|
|
|
|
|
"certificate_object": load_pem(load_cert_chain(ssl_certificate)[0]),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if domain == env['PRIMARY_HOSTNAME']:
|
|
|
|
|
# The primary domain must use the server certificate because
|
|
|
|
|
# it is hard-coded in some service configuration files.
|
|
|
|
|
return system_certificate
|
|
|
|
|
if domain == env['PRIMARY_HOSTNAME']:
|
|
|
|
|
# The primary domain must use the server certificate because
|
|
|
|
|
# it is hard-coded in some service configuration files.
|
|
|
|
|
return system_certificate
|
|
|
|
|
|
|
|
|
|
wildcard_domain = re.sub("^[^\.]+", "*", domain)
|
|
|
|
|
if domain in ssl_certificates:
|
|
|
|
|
@@ -155,112 +173,97 @@ def get_domain_ssl_files(domain, ssl_certificates, env, allow_missing_cert=False
|
|
|
|
|
|
|
|
|
|
# PROVISIONING CERTIFICATES FROM LETSENCRYPT
|
|
|
|
|
|
|
|
|
|
def get_certificates_to_provision(env, show_extended_problems=True, force_domains=None):
|
|
|
|
|
# Get a set of domain names that we should now provision certificates
|
|
|
|
|
# for. Provision if a domain name has no valid certificate or if any
|
|
|
|
|
# certificate is expiring in 14 days. If provisioning anything, also
|
|
|
|
|
# provision certificates expiring within 30 days. The period between
|
|
|
|
|
# 14 and 30 days allows us to consolidate domains into multi-domain
|
|
|
|
|
# certificates for domains expiring around the same time.
|
|
|
|
|
def get_certificates_to_provision(env, limit_domains=None, show_valid_certs=True):
|
|
|
|
|
# Get a set of domain names that we can provision certificates for
|
|
|
|
|
# using certbot. We start with domains that the box is serving web
|
|
|
|
|
# for and subtract:
|
|
|
|
|
# * domains not in limit_domains if limit_domains is not empty
|
|
|
|
|
# * domains with custom "A" records, i.e. they are hosted elsewhere
|
|
|
|
|
# * domains with actual "A" records that point elsewhere
|
|
|
|
|
# * domains that already have certificates that will be valid for a while
|
|
|
|
|
|
|
|
|
|
from web_update import get_web_domains
|
|
|
|
|
from status_checks import query_dns, normalize_ip
|
|
|
|
|
|
|
|
|
|
import datetime
|
|
|
|
|
now = datetime.datetime.utcnow()
|
|
|
|
|
existing_certs = get_ssl_certificates(env)
|
|
|
|
|
|
|
|
|
|
# Get domains with missing & expiring certificates.
|
|
|
|
|
certs = get_ssl_certificates(env)
|
|
|
|
|
domains = set()
|
|
|
|
|
domains_if_any = set()
|
|
|
|
|
problems = { }
|
|
|
|
|
for domain in get_web_domains(env):
|
|
|
|
|
# If the user really wants a cert for certain domains, include it.
|
|
|
|
|
if force_domains:
|
|
|
|
|
if force_domains == "ALL" or (isinstance(force_domains, list) and domain in force_domains):
|
|
|
|
|
domains.add(domain)
|
|
|
|
|
plausible_web_domains = get_web_domains(env, exclude_dns_elsewhere=False)
|
|
|
|
|
actual_web_domains = get_web_domains(env)
|
|
|
|
|
|
|
|
|
|
domains_to_provision = set()
|
|
|
|
|
domains_cant_provision = { }
|
|
|
|
|
|
|
|
|
|
for domain in plausible_web_domains:
|
|
|
|
|
# Skip domains that the user doesn't want to provision now.
|
|
|
|
|
if limit_domains and domain not in limit_domains:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Include this domain if its certificate is missing, self-signed, or expiring soon.
|
|
|
|
|
try:
|
|
|
|
|
cert = get_domain_ssl_files(domain, certs, env, allow_missing_cert=True)
|
|
|
|
|
except FileNotFoundError as e:
|
|
|
|
|
# system certificate is not present
|
|
|
|
|
problems[domain] = "Error: " + str(e)
|
|
|
|
|
continue
|
|
|
|
|
if cert is None:
|
|
|
|
|
# No valid certificate available.
|
|
|
|
|
domains.add(domain)
|
|
|
|
|
# Check that there isn't an explicit A/AAAA record.
|
|
|
|
|
if domain not in actual_web_domains:
|
|
|
|
|
domains_cant_provision[domain] = "The domain has a custom DNS A/AAAA record that points the domain elsewhere, so there is no point to installing a TLS certificate here and we could not automatically provision one anyway because provisioning requires access to the website (which isn't here)."
|
|
|
|
|
|
|
|
|
|
# Check that the DNS resolves to here.
|
|
|
|
|
else:
|
|
|
|
|
cert = cert["certificate_object"]
|
|
|
|
|
if cert.issuer == cert.subject:
|
|
|
|
|
# This is self-signed. Get a real one.
|
|
|
|
|
domains.add(domain)
|
|
|
|
|
|
|
|
|
|
# Does the domain resolve to this machine in public DNS? If not,
|
|
|
|
|
# we can't do domain control validation. For IPv6 is configured,
|
|
|
|
|
# make sure both IPv4 and IPv6 are correct because we don't know
|
|
|
|
|
# how Let's Encrypt will connect.
|
|
|
|
|
bad_dns = []
|
|
|
|
|
for rtype, value in [("A", env["PUBLIC_IP"]), ("AAAA", env.get("PUBLIC_IPV6"))]:
|
|
|
|
|
if not value: continue # IPv6 is not configured
|
|
|
|
|
response = query_dns(domain, rtype)
|
|
|
|
|
if response != normalize_ip(value):
|
|
|
|
|
bad_dns.append("%s (%s)" % (response, rtype))
|
|
|
|
|
|
|
|
|
|
if bad_dns:
|
|
|
|
|
domains_cant_provision[domain] = "The domain name does not resolve to this machine: " \
|
|
|
|
|
+ (", ".join(bad_dns)) \
|
|
|
|
|
+ "."
|
|
|
|
|
|
|
|
|
|
# Valid certificate today, but is it expiring soon?
|
|
|
|
|
elif cert.not_valid_after-now < datetime.timedelta(days=14):
|
|
|
|
|
domains.add(domain)
|
|
|
|
|
elif cert.not_valid_after-now < datetime.timedelta(days=30):
|
|
|
|
|
domains_if_any.add(domain)
|
|
|
|
|
else:
|
|
|
|
|
# DNS is all good.
|
|
|
|
|
|
|
|
|
|
# It's valid. Should we report its validness?
|
|
|
|
|
elif show_extended_problems:
|
|
|
|
|
problems[domain] = "The certificate is valid for at least another 30 days --- no need to replace."
|
|
|
|
|
# Check for a good existing cert.
|
|
|
|
|
existing_cert = get_domain_ssl_files(domain, existing_certs, env, use_main_cert=False)
|
|
|
|
|
if existing_cert:
|
|
|
|
|
existing_cert_check = check_certificate(domain, existing_cert['certificate'], existing_cert['private-key'],
|
|
|
|
|
warn_if_expiring_soon=14)
|
|
|
|
|
if existing_cert_check[0] == "OK":
|
|
|
|
|
if show_valid_certs:
|
|
|
|
|
domains_cant_provision[domain] = "The domain has a valid certificate already. ({} Certificate: {}, private key {})".format(
|
|
|
|
|
existing_cert_check[1],
|
|
|
|
|
existing_cert['certificate'],
|
|
|
|
|
existing_cert['private-key'])
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Warn the user about domains hosted elsewhere.
|
|
|
|
|
if not force_domains and show_extended_problems:
|
|
|
|
|
for domain in set(get_web_domains(env, exclude_dns_elsewhere=False)) - set(get_web_domains(env)):
|
|
|
|
|
problems[domain] = "The domain's DNS is pointed elsewhere, so there is no point to installing a TLS certificate here and we could not automatically provision one anyway because provisioning requires access to the website (which isn't here)."
|
|
|
|
|
domains_to_provision.add(domain)
|
|
|
|
|
|
|
|
|
|
# Filter out domains that we can't provision a certificate for.
|
|
|
|
|
def can_provision_for_domain(domain):
|
|
|
|
|
from status_checks import query_dns, normalize_ip
|
|
|
|
|
|
|
|
|
|
# Does the domain resolve to this machine in public DNS? If not,
|
|
|
|
|
# we can't do domain control validation. For IPv6 is configured,
|
|
|
|
|
# make sure both IPv4 and IPv6 are correct because we don't know
|
|
|
|
|
# how Let's Encrypt will connect.
|
|
|
|
|
for rtype, value in [("A", env["PUBLIC_IP"]), ("AAAA", env.get("PUBLIC_IPV6"))]:
|
|
|
|
|
if not value: continue # IPv6 is not configured
|
|
|
|
|
response = query_dns(domain, rtype)
|
|
|
|
|
if response != normalize_ip(value):
|
|
|
|
|
problems[domain] = "The domain name does not resolve to this machine: DNS %s resolved to %s." % (rtype, response)
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
domains = set(filter(can_provision_for_domain, domains))
|
|
|
|
|
|
|
|
|
|
# If there are any domains we definitely will provision for, add in
|
|
|
|
|
# additional domains to do at this time.
|
|
|
|
|
if len(domains) > 0:
|
|
|
|
|
domains |= set(filter(can_provision_for_domain, domains_if_any))
|
|
|
|
|
|
|
|
|
|
return (domains, problems)
|
|
|
|
|
|
|
|
|
|
def provision_certificates(env, agree_to_tos_url=None, logger=None, show_extended_problems=True, force_domains=None, jsonable=False):
|
|
|
|
|
import requests.exceptions
|
|
|
|
|
import acme.messages
|
|
|
|
|
|
|
|
|
|
from free_tls_certificates import client
|
|
|
|
|
return (domains_to_provision, domains_cant_provision)
|
|
|
|
|
|
|
|
|
|
def provision_certificates(env, limit_domains):
|
|
|
|
|
# What domains should we provision certificates for? And what
|
|
|
|
|
# errors prevent provisioning for other domains.
|
|
|
|
|
domains, problems = get_certificates_to_provision(env, force_domains=force_domains, show_extended_problems=show_extended_problems)
|
|
|
|
|
domains, domains_cant_provision = get_certificates_to_provision(env, limit_domains=limit_domains)
|
|
|
|
|
|
|
|
|
|
# Build a list of what happened on each domain or domain-set.
|
|
|
|
|
ret = []
|
|
|
|
|
for domain, error in domains_cant_provision.items():
|
|
|
|
|
ret.append({
|
|
|
|
|
"domains": [domain],
|
|
|
|
|
"log": [error],
|
|
|
|
|
"result": "skipped",
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
# Exit fast if there is nothing to do.
|
|
|
|
|
if len(domains) == 0:
|
|
|
|
|
return {
|
|
|
|
|
"requests": [],
|
|
|
|
|
"problems": problems,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# Break into groups of up to 100 certificates at a time, which is Let's Encrypt's
|
|
|
|
|
# limit for a single certificate. We'll sort to put related domains together.
|
|
|
|
|
max_domains_per_group = 100
|
|
|
|
|
domains = sort_domains(domains, env)
|
|
|
|
|
certs = []
|
|
|
|
|
while len(domains) > 0:
|
|
|
|
|
certs.append( domains[0:100] )
|
|
|
|
|
domains = domains[100:]
|
|
|
|
|
certs.append( domains[:max_domains_per_group] )
|
|
|
|
|
domains = domains[max_domains_per_group:]
|
|
|
|
|
|
|
|
|
|
# Prepare to provision.
|
|
|
|
|
|
|
|
|
|
@@ -269,115 +272,82 @@ def provision_certificates(env, agree_to_tos_url=None, logger=None, show_extende
|
|
|
|
|
if not os.path.exists(account_path):
|
|
|
|
|
os.mkdir(account_path)
|
|
|
|
|
|
|
|
|
|
# Where should we put ACME challenge files. This is mapped to /.well-known/acme_challenge
|
|
|
|
|
# by the nginx configuration.
|
|
|
|
|
challenges_path = os.path.join(account_path, 'acme_challenges')
|
|
|
|
|
if not os.path.exists(challenges_path):
|
|
|
|
|
os.mkdir(challenges_path)
|
|
|
|
|
|
|
|
|
|
# Read in the private key that we use for all TLS certificates. We'll need that
|
|
|
|
|
# to generate a CSR (done by free_tls_certificates).
|
|
|
|
|
with open(os.path.join(env['STORAGE_ROOT'], 'ssl/ssl_private_key.pem'), 'rb') as f:
|
|
|
|
|
private_key = f.read()
|
|
|
|
|
|
|
|
|
|
# Provision certificates.
|
|
|
|
|
|
|
|
|
|
ret = []
|
|
|
|
|
for domain_list in certs:
|
|
|
|
|
# For return.
|
|
|
|
|
ret_item = {
|
|
|
|
|
ret.append({
|
|
|
|
|
"domains": domain_list,
|
|
|
|
|
"log": [],
|
|
|
|
|
}
|
|
|
|
|
ret.append(ret_item)
|
|
|
|
|
|
|
|
|
|
# Logging for free_tls_certificates.
|
|
|
|
|
def my_logger(message):
|
|
|
|
|
if logger: logger(message)
|
|
|
|
|
ret_item["log"].append(message)
|
|
|
|
|
|
|
|
|
|
# Attempt to provision a certificate.
|
|
|
|
|
})
|
|
|
|
|
try:
|
|
|
|
|
try:
|
|
|
|
|
cert = client.issue_certificate(
|
|
|
|
|
domain_list,
|
|
|
|
|
account_path,
|
|
|
|
|
agree_to_tos_url=agree_to_tos_url,
|
|
|
|
|
private_key=private_key,
|
|
|
|
|
logger=my_logger)
|
|
|
|
|
# Create a CSR file for our master private key so that certbot
|
|
|
|
|
# uses our private key.
|
|
|
|
|
key_file = os.path.join(env['STORAGE_ROOT'], 'ssl', 'ssl_private_key.pem')
|
|
|
|
|
with tempfile.NamedTemporaryFile() as csr_file:
|
|
|
|
|
# We could use openssl, but certbot requires
|
|
|
|
|
# that the CN domain and SAN domains match
|
|
|
|
|
# the domain list passed to certbot, and adding
|
|
|
|
|
# SAN domains openssl req is ridiculously complicated.
|
|
|
|
|
# subprocess.check_output([
|
|
|
|
|
# "openssl", "req", "-new",
|
|
|
|
|
# "-key", key_file,
|
|
|
|
|
# "-out", csr_file.name,
|
|
|
|
|
# "-subj", "/CN=" + domain_list[0],
|
|
|
|
|
# "-sha256" ])
|
|
|
|
|
from cryptography import x509
|
|
|
|
|
from cryptography.hazmat.backends import default_backend
|
|
|
|
|
from cryptography.hazmat.primitives.serialization import Encoding
|
|
|
|
|
from cryptography.hazmat.primitives import hashes
|
|
|
|
|
from cryptography.x509.oid import NameOID
|
|
|
|
|
builder = x509.CertificateSigningRequestBuilder()
|
|
|
|
|
builder = builder.subject_name(x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, domain_list[0]) ]))
|
|
|
|
|
builder = builder.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True)
|
|
|
|
|
builder = builder.add_extension(x509.SubjectAlternativeName(
|
|
|
|
|
[x509.DNSName(d) for d in domain_list]
|
|
|
|
|
), critical=False)
|
|
|
|
|
request = builder.sign(load_pem(load_cert_chain(key_file)[0]), hashes.SHA256(), default_backend())
|
|
|
|
|
with open(csr_file.name, "wb") as f:
|
|
|
|
|
f.write(request.public_bytes(Encoding.PEM))
|
|
|
|
|
|
|
|
|
|
except client.NeedToTakeAction as e:
|
|
|
|
|
# Write out the ACME challenge files.
|
|
|
|
|
for action in e.actions:
|
|
|
|
|
if isinstance(action, client.NeedToInstallFile):
|
|
|
|
|
fn = os.path.join(challenges_path, action.file_name)
|
|
|
|
|
with open(fn, 'w') as f:
|
|
|
|
|
f.write(action.contents)
|
|
|
|
|
else:
|
|
|
|
|
raise ValueError(str(action))
|
|
|
|
|
# Provision, writing to a temporary file.
|
|
|
|
|
webroot = os.path.join(account_path, 'webroot')
|
|
|
|
|
os.makedirs(webroot, exist_ok=True)
|
|
|
|
|
with tempfile.TemporaryDirectory() as d:
|
|
|
|
|
cert_file = os.path.join(d, 'cert_and_chain.pem')
|
|
|
|
|
print("Provisioning TLS certificates for " + ", ".join(domain_list) + ".")
|
|
|
|
|
certbotret = subprocess.check_output([
|
|
|
|
|
"certbot",
|
|
|
|
|
"certonly",
|
|
|
|
|
#"-v", # just enough to see ACME errors
|
|
|
|
|
"--non-interactive", # will fail if user hasn't registered during Mail-in-a-Box setup
|
|
|
|
|
|
|
|
|
|
# Try to provision now that the challenge files are installed.
|
|
|
|
|
"-d", ",".join(domain_list), # first will be main domain
|
|
|
|
|
|
|
|
|
|
cert = client.issue_certificate(
|
|
|
|
|
domain_list,
|
|
|
|
|
account_path,
|
|
|
|
|
private_key=private_key,
|
|
|
|
|
logger=my_logger)
|
|
|
|
|
"--csr", csr_file.name, # use our private key; unfortunately this doesn't work with auto-renew so we need to save cert manually
|
|
|
|
|
"--cert-path", os.path.join(d, 'cert'), # we only use the full chain
|
|
|
|
|
"--chain-path", os.path.join(d, 'chain'), # we only use the full chain
|
|
|
|
|
"--fullchain-path", cert_file,
|
|
|
|
|
|
|
|
|
|
except client.NeedToAgreeToTOS as e:
|
|
|
|
|
# The user must agree to the Let's Encrypt terms of service agreement
|
|
|
|
|
# before any further action can be taken.
|
|
|
|
|
ret_item.update({
|
|
|
|
|
"result": "agree-to-tos",
|
|
|
|
|
"url": e.url,
|
|
|
|
|
})
|
|
|
|
|
"--webroot", "--webroot-path", webroot,
|
|
|
|
|
|
|
|
|
|
except client.WaitABit as e:
|
|
|
|
|
# We need to hold on for a bit before querying again to see if we can
|
|
|
|
|
# acquire a provisioned certificate.
|
|
|
|
|
import time, datetime
|
|
|
|
|
ret_item.update({
|
|
|
|
|
"result": "wait",
|
|
|
|
|
"until": e.until_when if not jsonable else e.until_when.isoformat(),
|
|
|
|
|
"seconds": (e.until_when - datetime.datetime.now()).total_seconds()
|
|
|
|
|
})
|
|
|
|
|
"--config-dir", account_path,
|
|
|
|
|
#"--staging",
|
|
|
|
|
], stderr=subprocess.STDOUT).decode("utf8")
|
|
|
|
|
install_cert_copy_file(cert_file, env)
|
|
|
|
|
|
|
|
|
|
except client.AccountDataIsCorrupt as e:
|
|
|
|
|
# This is an extremely rare condition.
|
|
|
|
|
ret_item.update({
|
|
|
|
|
"result": "error",
|
|
|
|
|
"message": "Something unexpected went wrong. It looks like your local Let's Encrypt account data is corrupted. There was a problem with the file " + e.account_file_path + ".",
|
|
|
|
|
})
|
|
|
|
|
ret[-1]["log"].append(certbotret)
|
|
|
|
|
ret[-1]["result"] = "installed"
|
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
|
|
|
ret[-1]["log"].append(e.output.decode("utf8"))
|
|
|
|
|
ret[-1]["result"] = "error"
|
|
|
|
|
except Exception as e:
|
|
|
|
|
ret[-1]["log"].append(str(e))
|
|
|
|
|
ret[-1]["result"] = "error"
|
|
|
|
|
|
|
|
|
|
except (client.InvalidDomainName, client.NeedToTakeAction, client.ChallengeFailed, client.RateLimited, acme.messages.Error, requests.exceptions.RequestException) as e:
|
|
|
|
|
ret_item.update({
|
|
|
|
|
"result": "error",
|
|
|
|
|
"message": "Something unexpected went wrong: " + str(e),
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
# A certificate was issued.
|
|
|
|
|
|
|
|
|
|
install_status = install_cert(domain_list[0], cert['cert'].decode("ascii"), b"\n".join(cert['chain']).decode("ascii"), env, raw=True)
|
|
|
|
|
|
|
|
|
|
# str indicates the certificate was not installed.
|
|
|
|
|
if isinstance(install_status, str):
|
|
|
|
|
ret_item.update({
|
|
|
|
|
"result": "error",
|
|
|
|
|
"message": "Something unexpected was wrong with the provisioned certificate: " + install_status,
|
|
|
|
|
})
|
|
|
|
|
else:
|
|
|
|
|
# A list indicates success and what happened next.
|
|
|
|
|
ret_item["log"].extend(install_status)
|
|
|
|
|
ret_item.update({
|
|
|
|
|
"result": "installed",
|
|
|
|
|
})
|
|
|
|
|
# Run post-install steps.
|
|
|
|
|
ret.extend(post_install_func(env))
|
|
|
|
|
|
|
|
|
|
# Return what happened with each certificate request.
|
|
|
|
|
return {
|
|
|
|
|
"requests": ret,
|
|
|
|
|
"problems": problems,
|
|
|
|
|
}
|
|
|
|
|
return ret
|
|
|
|
|
|
|
|
|
|
def provision_certificates_cmdline():
|
|
|
|
|
import sys
|
|
|
|
|
@@ -388,151 +358,39 @@ def provision_certificates_cmdline():
|
|
|
|
|
Lock(die=True).forever()
|
|
|
|
|
env = load_environment()
|
|
|
|
|
|
|
|
|
|
verbose = False
|
|
|
|
|
headless = False
|
|
|
|
|
force_domains = None
|
|
|
|
|
show_extended_problems = True
|
|
|
|
|
|
|
|
|
|
args = list(sys.argv)
|
|
|
|
|
args.pop(0) # program name
|
|
|
|
|
if args and args[0] == "-v":
|
|
|
|
|
verbose = True
|
|
|
|
|
args.pop(0)
|
|
|
|
|
if args and args[0] == "-q":
|
|
|
|
|
show_extended_problems = False
|
|
|
|
|
args.pop(0)
|
|
|
|
|
if args and args[0] == "--headless":
|
|
|
|
|
headless = True
|
|
|
|
|
args.pop(0)
|
|
|
|
|
if args and args[0] == "--force":
|
|
|
|
|
force_domains = "ALL"
|
|
|
|
|
args.pop(0)
|
|
|
|
|
else:
|
|
|
|
|
force_domains = args
|
|
|
|
|
quiet = False
|
|
|
|
|
domains = []
|
|
|
|
|
|
|
|
|
|
agree_to_tos_url = None
|
|
|
|
|
while True:
|
|
|
|
|
# Run the provisioning script. This installs certificates. If there are
|
|
|
|
|
# a very large number of domains on this box, it issues separate
|
|
|
|
|
# certificates for groups of domains. We have to check the result for
|
|
|
|
|
# each group.
|
|
|
|
|
def my_logger(message):
|
|
|
|
|
if verbose:
|
|
|
|
|
print(">", message)
|
|
|
|
|
status = provision_certificates(env, agree_to_tos_url=agree_to_tos_url, logger=my_logger, force_domains=force_domains, show_extended_problems=show_extended_problems)
|
|
|
|
|
agree_to_tos_url = None # reset to prevent infinite looping
|
|
|
|
|
for arg in sys.argv[1:]:
|
|
|
|
|
if arg == "-q":
|
|
|
|
|
quiet = True
|
|
|
|
|
else:
|
|
|
|
|
domains.append(arg)
|
|
|
|
|
|
|
|
|
|
if not status["requests"]:
|
|
|
|
|
# No domains need certificates.
|
|
|
|
|
if not headless or verbose:
|
|
|
|
|
if len(status["problems"]) == 0:
|
|
|
|
|
print("No domains hosted on this box need a new TLS certificate at this time.")
|
|
|
|
|
elif len(status["problems"]) > 0:
|
|
|
|
|
print("No TLS certificates could be provisoned at this time:")
|
|
|
|
|
print()
|
|
|
|
|
for domain in sort_domains(status["problems"], env):
|
|
|
|
|
print("%s: %s" % (domain, status["problems"][domain]))
|
|
|
|
|
# Go.
|
|
|
|
|
status = provision_certificates(env, limit_domains=domains)
|
|
|
|
|
|
|
|
|
|
sys.exit(0)
|
|
|
|
|
|
|
|
|
|
# What happened?
|
|
|
|
|
wait_until = None
|
|
|
|
|
wait_domains = []
|
|
|
|
|
for request in status["requests"]:
|
|
|
|
|
if request["result"] == "agree-to-tos":
|
|
|
|
|
# We may have asked already in a previous iteration.
|
|
|
|
|
if agree_to_tos_url is not None:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Can't ask the user a question in this mode. Warn the user that something
|
|
|
|
|
# needs to be done.
|
|
|
|
|
if headless:
|
|
|
|
|
print(", ".join(request["domains"]) + " need a new or renewed TLS certificate.")
|
|
|
|
|
print()
|
|
|
|
|
print("This box can't do that automatically for you until you agree to Let's Encrypt's")
|
|
|
|
|
print("Terms of Service agreement. Use the Mail-in-a-Box control panel to provision")
|
|
|
|
|
print("certificates for these domains.")
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
print("""
|
|
|
|
|
I'm going to provision a TLS certificate (formerly called a SSL certificate)
|
|
|
|
|
for you from Let's Encrypt (letsencrypt.org).
|
|
|
|
|
|
|
|
|
|
TLS certificates are cryptographic keys that ensure communication between
|
|
|
|
|
you and this box are secure when getting and sending mail and visiting
|
|
|
|
|
websites hosted on this box. Let's Encrypt is a free provider of TLS
|
|
|
|
|
certificates.
|
|
|
|
|
|
|
|
|
|
Please open this document in your web browser:
|
|
|
|
|
|
|
|
|
|
%s
|
|
|
|
|
|
|
|
|
|
It is Let's Encrypt's terms of service agreement. If you agree, I can
|
|
|
|
|
provision that TLS certificate. If you don't agree, you will have an
|
|
|
|
|
opportunity to install your own TLS certificate from the Mail-in-a-Box
|
|
|
|
|
control panel.
|
|
|
|
|
|
|
|
|
|
Do you agree to the agreement? Type Y or N and press <ENTER>: """
|
|
|
|
|
% request["url"], end='', flush=True)
|
|
|
|
|
|
|
|
|
|
if sys.stdin.readline().strip().upper() != "Y":
|
|
|
|
|
print("\nYou didn't agree. Quitting.")
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
# Okay, indicate agreement on next iteration.
|
|
|
|
|
agree_to_tos_url = request["url"]
|
|
|
|
|
|
|
|
|
|
if request["result"] == "wait":
|
|
|
|
|
# Must wait. We'll record until when. The wait occurs below.
|
|
|
|
|
if wait_until is None:
|
|
|
|
|
wait_until = request["until"]
|
|
|
|
|
else:
|
|
|
|
|
wait_until = max(wait_until, request["until"])
|
|
|
|
|
wait_domains += request["domains"]
|
|
|
|
|
|
|
|
|
|
if request["result"] == "error":
|
|
|
|
|
print(", ".join(request["domains"]) + ":")
|
|
|
|
|
print(request["message"])
|
|
|
|
|
|
|
|
|
|
if request["result"] == "installed":
|
|
|
|
|
print("A TLS certificate was successfully installed for " + ", ".join(request["domains"]) + ".")
|
|
|
|
|
|
|
|
|
|
if wait_until:
|
|
|
|
|
# Wait, then loop.
|
|
|
|
|
import time, datetime
|
|
|
|
|
# Show what happened.
|
|
|
|
|
for request in status:
|
|
|
|
|
if isinstance(request, str):
|
|
|
|
|
print(request)
|
|
|
|
|
else:
|
|
|
|
|
if quiet and request['result'] == 'skipped':
|
|
|
|
|
continue
|
|
|
|
|
print(request['result'] + ":", ", ".join(request['domains']) + ":")
|
|
|
|
|
for line in request["log"]:
|
|
|
|
|
print(line)
|
|
|
|
|
print()
|
|
|
|
|
print("A TLS certificate was requested for: " + ", ".join(wait_domains) + ".")
|
|
|
|
|
first = True
|
|
|
|
|
while wait_until > datetime.datetime.now():
|
|
|
|
|
if not headless or first:
|
|
|
|
|
print ("We have to wait", int(round((wait_until - datetime.datetime.now()).total_seconds())), "seconds for the certificate to be issued...")
|
|
|
|
|
time.sleep(10)
|
|
|
|
|
first = False
|
|
|
|
|
|
|
|
|
|
continue # Loop!
|
|
|
|
|
|
|
|
|
|
if agree_to_tos_url:
|
|
|
|
|
# The user agrees to the TOS. Loop to try again by agreeing.
|
|
|
|
|
continue # Loop!
|
|
|
|
|
|
|
|
|
|
# Unless we were instructed to wait, or we just agreed to the TOS,
|
|
|
|
|
# we're done for now.
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
# And finally show the domains with problems.
|
|
|
|
|
if len(status["problems"]) > 0:
|
|
|
|
|
print("TLS certificates could not be provisoned for:")
|
|
|
|
|
for domain in sort_domains(status["problems"], env):
|
|
|
|
|
print("%s: %s" % (domain, status["problems"][domain]))
|
|
|
|
|
|
|
|
|
|
# INSTALLING A NEW CERTIFICATE FROM THE CONTROL PANEL
|
|
|
|
|
|
|
|
|
|
def create_csr(domain, ssl_key, country_code, env):
|
|
|
|
|
return shell("check_output", [
|
|
|
|
|
"openssl", "req", "-new",
|
|
|
|
|
"-key", ssl_key,
|
|
|
|
|
"-sha256",
|
|
|
|
|
"-subj", "/C=%s/CN=%s" % (country_code, domain)])
|
|
|
|
|
"openssl", "req", "-new",
|
|
|
|
|
"-key", ssl_key,
|
|
|
|
|
"-sha256",
|
|
|
|
|
"-subj", "/C=%s/CN=%s" % (country_code, domain)])
|
|
|
|
|
|
|
|
|
|
def install_cert(domain, ssl_cert, ssl_chain, env, raw=False):
|
|
|
|
|
# Write the combined cert+chain to a temporary path and validate that it is OK.
|
|
|
|
|
@@ -553,6 +411,16 @@ def install_cert(domain, ssl_cert, ssl_chain, env, raw=False):
|
|
|
|
|
cert_status += " " + cert_status_details
|
|
|
|
|
return cert_status
|
|
|
|
|
|
|
|
|
|
# Copy certifiate into ssl directory.
|
|
|
|
|
install_cert_copy_file(fn, env)
|
|
|
|
|
|
|
|
|
|
# Run post-install steps.
|
|
|
|
|
ret = post_install_func(env)
|
|
|
|
|
if raw: return ret
|
|
|
|
|
return "\n".join(ret)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def install_cert_copy_file(fn, env):
|
|
|
|
|
# Where to put it?
|
|
|
|
|
# Make a unique path for the certificate.
|
|
|
|
|
from cryptography.hazmat.primitives import hashes
|
|
|
|
|
@@ -570,14 +438,26 @@ def install_cert(domain, ssl_cert, ssl_chain, env, raw=False):
|
|
|
|
|
os.makedirs(os.path.dirname(ssl_certificate), exist_ok=True)
|
|
|
|
|
shutil.move(fn, ssl_certificate)
|
|
|
|
|
|
|
|
|
|
ret = ["OK"]
|
|
|
|
|
|
|
|
|
|
# When updating the cert for PRIMARY_HOSTNAME, symlink it from the system
|
|
|
|
|
def post_install_func(env):
|
|
|
|
|
ret = []
|
|
|
|
|
|
|
|
|
|
# Get the certificate to use for PRIMARY_HOSTNAME.
|
|
|
|
|
ssl_certificates = get_ssl_certificates(env)
|
|
|
|
|
cert = get_domain_ssl_files(env['PRIMARY_HOSTNAME'], ssl_certificates, env, use_main_cert=False)
|
|
|
|
|
if not cert:
|
|
|
|
|
# Ruh-row, we don't have any certificate usable
|
|
|
|
|
# for the primary hostname.
|
|
|
|
|
ret.append("there is no valid certificate for " + env['PRIMARY_HOSTNAME'])
|
|
|
|
|
|
|
|
|
|
# Symlink the best cert for PRIMARY_HOSTNAME to the system
|
|
|
|
|
# certificate path, which is hard-coded for various purposes, and then
|
|
|
|
|
# restart postfix and dovecot.
|
|
|
|
|
if domain == env['PRIMARY_HOSTNAME']:
|
|
|
|
|
system_ssl_certificate = os.path.join(os.path.join(env["STORAGE_ROOT"], 'ssl', 'ssl_certificate.pem'))
|
|
|
|
|
if cert and os.readlink(system_ssl_certificate) != cert['certificate']:
|
|
|
|
|
# Update symlink.
|
|
|
|
|
system_ssl_certificate = os.path.join(os.path.join(env["STORAGE_ROOT"], 'ssl', 'ssl_certificate.pem'))
|
|
|
|
|
ret.append("updating primary certificate")
|
|
|
|
|
ssl_certificate = cert['certificate']
|
|
|
|
|
os.unlink(system_ssl_certificate)
|
|
|
|
|
os.symlink(ssl_certificate, system_ssl_certificate)
|
|
|
|
|
|
|
|
|
|
@@ -593,12 +473,12 @@ def install_cert(domain, ssl_cert, ssl_chain, env, raw=False):
|
|
|
|
|
# Update the web configuration so nginx picks up the new certificate file.
|
|
|
|
|
from web_update import do_web_update
|
|
|
|
|
ret.append( do_web_update(env) )
|
|
|
|
|
if raw: return ret
|
|
|
|
|
return "\n".join(ret)
|
|
|
|
|
|
|
|
|
|
return ret
|
|
|
|
|
|
|
|
|
|
# VALIDATION OF CERTIFICATES
|
|
|
|
|
|
|
|
|
|
def check_certificate(domain, ssl_certificate, ssl_private_key, warn_if_expiring_soon=True, rounded_time=False, just_check_domain=False):
|
|
|
|
|
def check_certificate(domain, ssl_certificate, ssl_private_key, warn_if_expiring_soon=10, rounded_time=False, just_check_domain=False):
|
|
|
|
|
# Check that the ssl_certificate & ssl_private_key files are good
|
|
|
|
|
# for the provided domain.
|
|
|
|
|
|
|
|
|
|
@@ -704,7 +584,7 @@ def check_certificate(domain, ssl_certificate, ssl_private_key, warn_if_expiring
|
|
|
|
|
# We'll renew it with Lets Encrypt.
|
|
|
|
|
expiry_info = "The certificate expires on %s." % cert_expiration_date.strftime("%x")
|
|
|
|
|
|
|
|
|
|
if ndays <= 10 and warn_if_expiring_soon:
|
|
|
|
|
if warn_if_expiring_soon and ndays <= warn_if_expiring_soon:
|
|
|
|
|
# Warn on day 10 to give 4 days for us to automatically renew the
|
|
|
|
|
# certificate, which occurs on day 14.
|
|
|
|
|
return ("The certificate is expiring soon: " + expiry_info, None)
|
|
|
|
|
|