1
0
mirror of https://github.com/mail-in-a-box/mailinabox.git synced 2024-11-23 02:27:05 +00:00

Added key rollover code.

This commit is contained in:
Ashiq5 2020-11-04 20:25:25 +06:00
parent 94aab7c5e2
commit e6657d6ebe
6 changed files with 228 additions and 29 deletions

View File

@ -364,6 +364,77 @@ def ssl_get_csr(domain):
ssl_private_key = os.path.join(os.path.join(env["STORAGE_ROOT"], 'ssl', 'ssl_private_key.pem'))
return create_csr(domain, ssl_private_key, request.form.get('countrycode', ''), env)
@app.route('/ssl/renew/<domain>', methods=['POST'])
@authorized_personnel_only
def ssl_renew(domain):
from exclusiveprocess import Lock
from utils import load_environment
from ssl_certificates import provision_certificates
existing_key = request.form.get('existing_key')
Lock(die=True).forever()
env = load_environment()
if existing_key == "yes":
status = provision_certificates(env, limit_domains=[], domain_to_be_renewed=domain)
app.logger.warning("renew without new key=", status) # TODO: remove this line after testing
elif existing_key == "no":
import glob
try:
# steps followed
# 1. take a backup of the current /home/user-data/ssl/ folder to be safe
# 2. renew all the existing certificates from CSR generated from the existing next_ssl_private_key
# 3. if the renew is successful, replace the current ssl_private_key with the next_ssl_private_key and
# 4. generate the next_ssl_private_key
# 5. if any error occurs, copy everything from the /home/user-data/ssl-backup folder to /home/user-data/ssl
# step 1
files = glob.glob(env["STORAGE_ROOT"] + "/ssl/*")
for file in files:
subprocess.check_output(["cp", "-r", file, env["STORAGE_ROOT"] + "/ssl-backup/"])
# step 2
status = provision_certificates(env, limit_domains=[], new_key=True)
# step 3 and 4 is in post_install_func method of ssl_certificates.py
app.logger.warning("renew with new key=", status) # TODO: remove this line after proper testing
except Exception as e:
import traceback
files = glob.glob(env["STORAGE_ROOT"] + "/ssl-backup/*")
for file in files:
subprocess.check_output(["cp", "-r", file, env["STORAGE_ROOT"] + "/ssl/"])
app.logger.warning(traceback.print_exc()) # TODO: remove this line after proper testing
return json_response({
"title": "Error",
"log": "Sorry, something is not right!",
})
else:
return json_response({
"title": "Error",
"log": "Sorry, something is not right!",
})
for item in status:
if isinstance(status, str):
continue
else:
if domain in item['domains']:
if item['result'] == 'skipped':
return json_response({
"title": item["result"].capitalize(),
"log": "\n".join(item['log']),
})
elif item['result'] == 'installed':
return json_response({
"title": item["result"].capitalize(),
"log": "Your certificate containing these domains " + ",".join(
item['domains']) + " have been renewed",
})
else:
return json_response({
"title": item["result"].capitalize(),
"log": "\n".join(item['log'])
})
@app.route('/ssl/install', methods=['POST'])
@authorized_personnel_only
def ssl_install_cert():
@ -604,7 +675,8 @@ def log_failed_login(request):
# APP
if __name__ == '__main__':
if "DEBUG" in os.environ: app.debug = True
app.debug = True # TODO: remove this line and uncomment the next line after testing
# if "DEBUG" in os.environ: app.debug = True
if "APIKEY" in os.environ: auth_service.key = os.environ["APIKEY"]
if not app.debug:

View File

@ -174,9 +174,11 @@ def build_zone(domain, all_domains, additional_records, www_redirect_domains, en
# Add a DANE TLSA record for SMTP.
records.append(("_25._tcp", "TLSA", build_tlsa_record(env), "Recommended when DNSSEC is enabled. Advertises to mail servers connecting to the box that mandatory encryption should be used."))
records.append(("_25._tcp", "TLSA", build_tlsa_record(env, from_cert=False), "Recommended when DNSSEC is enabled. Advertises to mail servers connecting to the box that mandatory encryption should be used."))
# Add a DANE TLSA record for HTTPS, which some browser extensions might make use of.
records.append(("_443._tcp", "TLSA", build_tlsa_record(env), "Optional. When DNSSEC is enabled, provides out-of-band HTTPS certificate validation for a few web clients that support it."))
records.append(("_443._tcp", "TLSA", build_tlsa_record(env, from_cert=False), "Optional. When DNSSEC is enabled, provides out-of-band HTTPS certificate validation for a few web clients that support it."))
# Add a SSHFP records to help SSH key validation. One per available SSH key on this system.
for value in build_sshfp_records():
@ -367,7 +369,7 @@ def build_zone(domain, all_domains, additional_records, www_redirect_domains, en
########################################################################
def build_tlsa_record(env):
def build_tlsa_record(env, from_cert=True):
# A DANE TLSA record in DNS specifies that connections on a port
# must use TLS and the certificate must match a particular criteria.
#
@ -390,11 +392,16 @@ def build_tlsa_record(env):
from ssl_certificates import load_cert_chain, load_pem
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
fn = os.path.join(env["STORAGE_ROOT"], "ssl", "ssl_certificate.pem")
cert = load_pem(load_cert_chain(fn)[0])
subject_public_key = cert.public_key().public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo)
# We could have also loaded ssl_private_key.pem and called priv_key.public_key().public_bytes(...)
if from_cert:
fn = os.path.join(env["STORAGE_ROOT"], "ssl", "ssl_certificate.pem")
cert = load_pem(load_cert_chain(fn)[0])
subject_public_key = cert.public_key().public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo)
else:
# this is for Double TLSA scheme of key rollover.
# More details here (https://mail.sys4.de/pipermail/dane-users/2018-February/000440.html)
fn = os.path.join(env["STORAGE_ROOT"], "ssl", "next_ssl_private_key.pem")
private_key = load_pem(open(fn, 'rb').read())
subject_public_key = private_key.public_key().public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo)
pk_hash = hashlib.sha256(subject_public_key).hexdigest()
@ -862,6 +869,8 @@ def set_custom_dns_record(qname, rtype, value, action, env):
if not re.search(DOMAIN_RE, value):
raise ValueError("Invalid value.")
elif rtype == "TLSA":
pass
elif rtype in ("CNAME", "TXT", "SRV", "MX", "SSHFP", "CAA"):
# anything goes
pass

View File

@ -45,7 +45,7 @@ def get_ssl_certificates(env):
# Remember stuff.
private_keys = { }
certificates = [ ]
certificates = []
# Scan each of the files to find private keys and certificates.
# We must load all of the private keys first before processing
@ -73,6 +73,7 @@ def get_ssl_certificates(env):
domains = { }
for cert in certificates:
# What domains is this certificate good for?
# cert_domains = cert common name + all SANs, primary_domain = cert common name
cert_domains, primary_domain = get_certificate_domains(cert)
cert._primary_domain = primary_domain
@ -186,8 +187,16 @@ def get_certificates_to_provision(env, limit_domains=None, show_valid_certs=True
from web_update import get_web_domains
from status_checks import query_dns, normalize_ip
# existing_certs = all valid certificates that are in /home/user-data/ssl or 1 level deep
# validity indicator -> private key exists for the cert public key?, not_expired?
# if multiple valid certs exist, then the one with the furthest expiry date and filename
# lexicographically smallest is returned
existing_certs = get_ssl_certificates(env)
# this function returns the list of domain names of all the email addresses and adds
# autoconfig, www, mta-sts, autodiscover subdomain to each of these domains
# if exclude_dns_elsewhere flag is set, then all the domains having A/AAAA record not on this machine
# are excluded
plausible_web_domains = get_web_domains(env, exclude_dns_elsewhere=False)
actual_web_domains = get_web_domains(env)
@ -212,7 +221,8 @@ def get_certificates_to_provision(env, limit_domains=None, show_valid_certs=True
# how Let's Encrypt will connect.
bad_dns = []
for rtype, value in [("A", env["PUBLIC_IP"]), ("AAAA", env.get("PUBLIC_IPV6"))]:
if not value: continue # IPv6 is not configured
if not value:
continue # IPv6 is not configured
response = query_dns(domain, rtype)
if response != normalize_ip(value):
bad_dns.append("%s (%s)" % (response, rtype))
@ -226,6 +236,7 @@ def get_certificates_to_provision(env, limit_domains=None, show_valid_certs=True
# DNS is all good.
# Check for a good existing cert.
# existing_cert = existing cert for domain
existing_cert = get_domain_ssl_files(domain, existing_certs, env, use_main_cert=False, allow_missing_cert=True)
if existing_cert:
existing_cert_check = check_certificate(domain, existing_cert['certificate'], existing_cert['private-key'],
@ -242,19 +253,30 @@ def get_certificates_to_provision(env, limit_domains=None, show_valid_certs=True
return (domains_to_provision, domains_cant_provision)
def provision_certificates(env, limit_domains):
def provision_certificates(env, limit_domains, domain_to_be_renewed=None, new_key=False):
# What domains should we provision certificates for? And what
# errors prevent provisioning for other domains.
domains, domains_cant_provision = get_certificates_to_provision(env, limit_domains=limit_domains)
# Build a list of what happened on each domain or domain-set.
ret = []
for domain, error in domains_cant_provision.items():
ret.append({
"domains": [domain],
"log": [error],
"result": "skipped",
})
is_tlsa_update_required = False
if new_key:
from web_update import get_web_domains
domains = get_web_domains(env)
elif domain_to_be_renewed:
existing_certs = get_ssl_certificates(env)
existing_cert = get_domain_ssl_files(domain_to_be_renewed, existing_certs, env, use_main_cert=False, allow_missing_cert=True)
domains, primary_domain = get_certificate_domains(load_pem(load_cert_chain(existing_cert["certificate"])[0]))
else:
# domains = domains for which a certificate can be provisioned
# domains_cant_provision = domains for which a certificate can't be provisioned and the reason
domains, domains_cant_provision = get_certificates_to_provision(env, limit_domains=limit_domains)
# Build a list of what happened on each domain or domain-set.
for domain, error in domains_cant_provision.items():
ret.append({
"domains": [domain],
"log": [error],
"result": "skipped",
})
# Break into groups by DNS zone: Group every domain with its parent domain, if
# its parent domain is in the list of domains to request a certificate for.
@ -309,6 +331,8 @@ def provision_certificates(env, limit_domains):
# Create a CSR file for our master private key so that certbot
# uses our private key.
key_file = os.path.join(env['STORAGE_ROOT'], 'ssl', 'ssl_private_key.pem')
if new_key:
key_file = os.path.join(env['STORAGE_ROOT'], 'ssl', 'next_ssl_private_key.pem')
with tempfile.NamedTemporaryFile() as csr_file:
# We could use openssl, but certbot requires
# that the CN domain and SAN domains match
@ -345,9 +369,9 @@ def provision_certificates(env, limit_domains):
"certbot",
"certonly",
#"-v", # just enough to see ACME errors
"--non-interactive", # will fail if user hasn't registered during Mail-in-a-Box setup
"--non-interactive", # will fail if user hasn't registered during Mail-in-a-Box setup
"-d", ",".join(domain_list), # first will be main domain
"-d", ",".join(domain_list), # first will be main domain
"--csr", csr_file.name, # use our private key; unfortunately this doesn't work with auto-renew so we need to save cert manually
"--cert-path", os.path.join(d, 'cert'), # we only use the full chain
@ -363,6 +387,8 @@ def provision_certificates(env, limit_domains):
ret[-1]["log"].append(certbotret)
ret[-1]["result"] = "installed"
if new_key and env['PRIMARY_HOSTNAME'] in domains:
is_tlsa_update_required = True
except subprocess.CalledProcessError as e:
ret[-1]["log"].append(e.output.decode("utf8"))
ret[-1]["result"] = "error"
@ -371,7 +397,7 @@ def provision_certificates(env, limit_domains):
ret[-1]["result"] = "error"
# Run post-install steps.
ret.extend(post_install_func(env))
ret.extend(post_install_func(env, is_tlsa_update_required=is_tlsa_update_required))
# Return what happened with each certificate request.
return ret
@ -466,7 +492,7 @@ def install_cert_copy_file(fn, env):
shutil.move(fn, ssl_certificate)
def post_install_func(env):
def post_install_func(env, is_tlsa_update_required=False):
ret = []
# Get the certificate to use for PRIMARY_HOSTNAME.
@ -496,6 +522,25 @@ def post_install_func(env):
# The DANE TLSA record will remain valid so long as the private key
# hasn't changed. We don't ever change the private key automatically.
# If the user does it, they must manually update DNS.
if is_tlsa_update_required:
from dns_update import do_dns_update, set_custom_dns_record, build_tlsa_record
subprocess.check_output([
"mv", env["STORAGE_ROOT"] + "/ssl/next_ssl_private_key.pem",
env["STORAGE_ROOT"] + "/ssl/ssl_private_key.pem"
])
subprocess.check_output([
"openssl", "genrsa",
"-out", env["STORAGE_ROOT"] + "/ssl/next_ssl_private_key.pem",
"2048"])
qname1 = "_25._tcp." + env['PRIMARY_HOSTNAME']
qname2 = "_443._tcp." + env['PRIMARY_HOSTNAME']
rtype = "TLSA"
value = build_tlsa_record(env, from_cert=False)
action = "add"
if set_custom_dns_record(qname1, rtype, value, action, env):
set_custom_dns_record(qname2, rtype, value, action, env)
ret.append(do_dns_update(env))
# Update the web configuration so nginx picks up the new certificate file.
from web_update import do_web_update

View File

@ -454,9 +454,9 @@ def check_primary_hostname_dns(domain, env, output, dns_domains, dns_zonefiles):
# Check the TLSA record.
tlsa_qname = "_25._tcp." + domain
tlsa25 = query_dns(tlsa_qname, "TLSA", nxdomain=None)
tlsa25 = query_dns(tlsa_qname, "TLSA", nxdomain=None).split('; ')
tlsa25_expected = build_tlsa_record(env)
if tlsa25 == tlsa25_expected:
if tlsa25_expected in tlsa25:
output.print_ok("""The DANE TLSA record for incoming mail is correct (%s).""" % tlsa_qname,)
elif tlsa25 is None:
if has_dnssec:

View File

@ -40,7 +40,10 @@
<h3 id="ssl_install_header">Install certificate</h3>
<p>If you don't want to use our automatic Let's Encrypt integration, you can give any other certificate provider a try. You can generate the needed CSR below.</p>
<p>If you don't want to use our automatic Let's Encrypt integration, you can give any other certificate provider a try. Click on install certificate button
if there is no certificate for your intended domain or
click on renew or replace certificate button and click replace if there is an existing certificate and you want to replace it with a new one from a different CA.
You can generate the needed CSR below.</p>
<p>Which domain are you getting a certificate for?</p>
@ -101,7 +104,8 @@ function show_tls(keep_provisioning_shown) {
$('#ssldomain').html('<option value="">(select)</option>');
$('#ssl_domains').show();
for (var i = 0; i < domains.length; i++) {
var row = $("<tr><th class='domain'><a href=''></a></th><td class='status'></td> <td class='actions'><a href='#' onclick='return ssl_install(this);' class='btn btn-xs'>Install Certificate</a></td></tr>");
var row = $("<tr><th class='domain'><a href=''></a></th><td class='status'></td> " +
"<td class='actions'><a href='#' onclick='return ssl_install(this);' class='btn btn-xs'>Install Certificate</a></td></tr>");
tb.append(row);
row.attr('data-domain', domains[i].domain);
row.find('.domain a').text(domains[i].domain);
@ -113,7 +117,10 @@ function show_tls(keep_provisioning_shown) {
row.addClass("text-" + domains[i].status);
row.find('.status').text(domains[i].text);
if (domains[i].status == "success") {
row.find('.actions a').addClass('btn-default').text('Replace Certificate');
row.find('.actions a').addClass('btn-default').text('Renew or replace Certificate');
row.find('.actions a').addClass('btn-default').on("click", function () {
ssl_renew_or_replace_modal(this);
});
} else {
row.find('.actions a').addClass('btn-primary').text('Install Certificate');
}
@ -131,6 +138,65 @@ function ssl_install(elem) {
return false;
}
function ssl_renew_or_replace_modal(elem) {
show_modal_confirm(
"Options",
"Do you want to replace the certificate with a new one or just renew this one?",
["Replace", "Renew"],
function () {
ssl_install(elem);
},
function () {
ssl_cert_renew(elem);
});
}
function ssl_cert_renew(elem) {
var domain = $(elem).parents('tr').attr('data-domain');
show_modal_confirm(
"Options",
"Do you want to renew with the existing key?",
["Yes", "No"],
function () {
ajax_with_indicator(true);
api(
"/ssl/renew/" + domain,
"POST",
{
existing_key: "yes"
},
function(data) {
$('#ajax_loading_indicator').stop(true).hide();
show_modal_error(data["title"], data["log"]);
show_tls(true);
},
function () {
$('#ajax_loading_indicator').stop(true).hide();
show_modal_error("Error", "Something is not right, sorry!");
show_tls(true);
});
},
function () {
ajax_with_indicator(true);
api(
"/ssl/renew/" + domain,
"POST",
{
existing_key: "no"
},
function(data) {
$('#ajax_loading_indicator').stop(true).hide();
show_modal_error(data["title"], data["log"]);
show_tls(true);
},
function () {
$('#ajax_loading_indicator').stop(true).hide();
show_modal_error("Error", "Something is not right, sorry!");
show_tls(true);
}
);
});
}
function show_csr() {
// Can't show a CSR until both inputs are entered.
if ($('#ssldomain').val() == "") return;

View File

@ -19,7 +19,7 @@
#
# The Diffie-Hellman cipher bits are used for SMTP and HTTPS, when a
# Diffie-Hellman cipher is selected during TLS negotiation. Diffie-Hellman
# provides Perfect Forward Secrecy.
# provides Perfect Forward Secrecy.
source setup/functions.sh # load our functions
source /etc/mailinabox.conf # load global vars
@ -66,6 +66,13 @@ if [ ! -f $STORAGE_ROOT/ssl/ssl_private_key.pem ]; then
openssl genrsa -out $STORAGE_ROOT/ssl/ssl_private_key.pem 2048)
fi
# for Double TLSA scheme. More details here (https://mail.sys4.de/pipermail/dane-users/2018-February/000440.html)
if [ ! -f $STORAGE_ROOT/ssl/next_ssl_private_key.pem ]; then
# Set the umask so the key file is never world-readable.
(umask 077; hide_output \
openssl genrsa -out $STORAGE_ROOT/ssl/next_ssl_private_key.pem 2048)
fi
# Generate a self-signed SSL certificate because things like nginx, dovecot,
# etc. won't even start without some certificate in place, and we need nginx
# so we can offer the user a control panel to install a better certificate.