mirror of
https://github.com/mail-in-a-box/mailinabox.git
synced 2026-03-04 15:54:48 +01:00
first pass at a management tool for checking what the user must do to finish his configuration: set NS records, DS records, sign his certificates, etc.
This commit is contained in:
220
management/check_configuration.py
Normal file
220
management/check_configuration.py
Normal file
@@ -0,0 +1,220 @@
|
||||
# Checks that the upstream DNS has been set correctly and that
|
||||
# SSL certificates have been signed, and if not tells the user
|
||||
# what to do next.
|
||||
|
||||
import os, os.path, re, subprocess
|
||||
|
||||
import dns.reversename, dns.resolver
|
||||
|
||||
from dns_update import get_dns_zones
|
||||
from web_update import get_web_domains, get_domain_ssl_files
|
||||
|
||||
from utils import shell, sort_domains
|
||||
|
||||
def run_checks(env):
|
||||
# Get the list of domains we serve DNS zones for (i.e. does not include subdomains).
|
||||
dns_zonefiles = dict(get_dns_zones(env))
|
||||
dns_domains = set(dns_zonefiles)
|
||||
|
||||
# Get the list of domains we serve HTTPS for.
|
||||
web_domains = set(get_web_domains(env))
|
||||
|
||||
# Check the domains.
|
||||
for domain in sort_domains(dns_domains | web_domains, env):
|
||||
print(domain)
|
||||
print("=" * len(domain))
|
||||
if domain == env["PUBLIC_HOSTNAME"]: check_primary_hostname_dns(domain, env)
|
||||
if domain in dns_domains: check_dns_zone(domain, env, dns_zonefiles)
|
||||
check_mx(domain, env)
|
||||
check_ssl_cert(domain, env)
|
||||
print()
|
||||
|
||||
def check_primary_hostname_dns(domain, env):
|
||||
# Check that the ns1/ns2 hostnames resolve to A records. This information probably
|
||||
# comes from the TLD since the information is set at the registrar.
|
||||
ip = query_dns("ns1." + domain, "A") + '/' + query_dns("ns2." + domain, "A")
|
||||
if ip == env['PUBLIC_IP'] + '/' + env['PUBLIC_IP']:
|
||||
print_ok("Nameserver IPs are correct at registrar. [ns1/ns2.%s => %s]" % (env['PUBLIC_HOSTNAME'], env['PUBLIC_IP']))
|
||||
else:
|
||||
print_error("""Nameserver IP addresses are incorrect. The ns1.%s and ns2.%s nameservers must be configured at your domain name
|
||||
registrar as having the IP address %s. They currently report addresses of %s. It may take several hours for
|
||||
public DNS to update after a change."""
|
||||
% (env['PUBLIC_HOSTNAME'], env['PUBLIC_HOSTNAME'], env['PUBLIC_IP'], ip))
|
||||
|
||||
# Check that PUBLIC_HOSTNAME resolves to PUBLIC_IP in public DNS.
|
||||
ip = query_dns(domain, "A")
|
||||
if ip == env['PUBLIC_IP']:
|
||||
print_ok("Domain resolves to box's IP address. [%s => %s]" % (env['PUBLIC_HOSTNAME'], env['PUBLIC_IP']))
|
||||
else:
|
||||
print_error("""This domain must resolve to your box's IP address (%s) in public DNS but it currently resolves
|
||||
to %s. It may take several hours for public DNS to update after a change. This problem may result from other
|
||||
issues listed here."""
|
||||
% (env['PUBLIC_IP'], ip))
|
||||
|
||||
# Check reverse DNS on the PUBLIC_HOSTNAME. Note that it might not be
|
||||
# a DNS zone if it is a subdomain of another domain we have a zone for.
|
||||
ipaddr_rev = dns.reversename.from_address(env['PUBLIC_IP'])
|
||||
existing_rdns = query_dns(ipaddr_rev, "PTR")
|
||||
if existing_rdns == domain:
|
||||
print_ok("Reverse DNS is set correctly at ISP. [%s => %s]" % (env['PUBLIC_IP'], env['PUBLIC_HOSTNAME']))
|
||||
else:
|
||||
print_error("""Your box's reverse DNS is currently %s, but it should be %s. Your ISP or cloud provider will have instructions
|
||||
on setting up reverse DNS for your box at %s.""" % (existing_rdns, domain, env['PUBLIC_IP']) )
|
||||
|
||||
def check_dns_zone(domain, env, dns_zonefiles):
|
||||
# We provide a DNS zone for the domain. It should have NS records set up
|
||||
# at the domain name's registrar pointing to this box.
|
||||
existing_ns = query_dns(domain, "NS")
|
||||
correct_ns = "ns1.BOX; ns2.BOX".replace("BOX", env['PUBLIC_HOSTNAME'])
|
||||
if existing_ns == correct_ns:
|
||||
print_ok("Nameservers are set correctly at registrar. [%s]" % correct_ns)
|
||||
else:
|
||||
print_error("""The nameservers set on this domain are incorrect. They are currently %s. Use your domain name registar's
|
||||
control panel to set the nameservers to %s."""
|
||||
% (existing_ns, correct_ns) )
|
||||
|
||||
# See if the domain's A record resolves to our PUBLIC_IP. This is already checked
|
||||
# for PUBLIC_HOSTNAME, for which it is required. For other domains it is just nice
|
||||
# to have if we want web.
|
||||
if domain != env['PUBLIC_HOSTNAME']:
|
||||
ip = query_dns(domain, "A")
|
||||
if ip == env['PUBLIC_IP']:
|
||||
print_ok("Domain resolves to this box's IP address. [%s => %s]" % (domain, env['PUBLIC_IP']))
|
||||
else:
|
||||
print_error("""This domain should resolve to your box's IP address (%s) if you would like the box to serve
|
||||
webmail or a website on this domain. The domain currently resolves to %s in public DNS. It may take several hours for
|
||||
public DNS to update after a change. This problem may result from other issues listed here.""" % (env['PUBLIC_IP'], ip))
|
||||
|
||||
# See if the domain has a DS record set.
|
||||
ds = query_dns(domain, "DS", nxdomain=None)
|
||||
ds_correct = open('/etc/nsd/zones/' + dns_zonefiles[domain] + '.ds').read().strip()
|
||||
ds_expected = re.sub(r"\S+\.\s+3600\s+IN\s+DS\s*", "", ds_correct)
|
||||
if ds == ds_expected:
|
||||
print_ok("DNS 'DS' record is set correctly at registrar.")
|
||||
elif ds == None:
|
||||
print_error("""This domain's DNS DS record is not set. The DS record is optional. The DS record activates DNSSEC.
|
||||
To set a DS record, you must follow the instructions provided by your domain name registrar and provide to them this information:""")
|
||||
print("")
|
||||
print(" " + ds_correct)
|
||||
print("")
|
||||
else:
|
||||
print_error("""This domain's DNS DS record is incorrect. The chain of trust is broken between the public DNS system
|
||||
and this machine's DNS server. It may take several hours for public DNS to update after a change. If you did not recently
|
||||
make a change, you must resolve this immediately by following the instructions provided by your domain name registrar and
|
||||
provide to them this information:""")
|
||||
print("")
|
||||
print(" " + ds_correct)
|
||||
print("")
|
||||
|
||||
def check_mx(domain, env):
|
||||
# Check the MX record.
|
||||
mx = query_dns(domain, "MX")
|
||||
expected_mx = "10 " + env['PUBLIC_HOSTNAME']
|
||||
if mx == expected_mx:
|
||||
print_ok("Domain's email is directed to this domain. [%s => %s]" % (domain, mx))
|
||||
else:
|
||||
print_error("""This domain's DNS MX record is incorrect. It is currently set to '%s' but should be '%s'. Mail will not
|
||||
be delivered to this box. It may take several hours for public DNS to update after a change. This problem may result from
|
||||
other issues listed here.""" % (mx, expected_mx))
|
||||
|
||||
def query_dns(qname, rtype, nxdomain='[Not Set]'):
|
||||
resolver = dns.resolver.get_default_resolver()
|
||||
try:
|
||||
response = dns.resolver.query(qname, rtype)
|
||||
except dns.resolver.NoNameservers:
|
||||
# Could not reach nameserver.
|
||||
raise
|
||||
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
|
||||
# Host did not have an answer for this query; not sure what the
|
||||
# difference is between the two exceptions.
|
||||
return nxdomain
|
||||
|
||||
# There may be multiple answers; concatenate the response. Remove trailing
|
||||
# periods from responses since that's how qnames are encoded in DNS but is
|
||||
# confusing for us.
|
||||
return "; ".join(str(r).rstrip('.') for r in response)
|
||||
|
||||
def check_ssl_cert(domain, env):
|
||||
# Check that SSL certificate is signed.
|
||||
|
||||
ssl_key, ssl_certificate, ssl_csr_path = get_domain_ssl_files(domain, env)
|
||||
|
||||
if not os.path.exists(ssl_certificate):
|
||||
print_error("The SSL certificate file for this domain is missing.")
|
||||
return
|
||||
|
||||
# Check that the certificate is good. In order to verify with openssl, we need to split out any
|
||||
# intermediary certificates in the chain (if any) from our certificate (at the top).
|
||||
|
||||
cert = open(ssl_certificate).read()
|
||||
mycert, chaincerts = re.match(r'(-*BEGIN CERTIFICATE-*.*?-*END CERTIFICATE-*)(.*)', cert, re.S).groups()
|
||||
|
||||
# This command returns a non-zero exit status in most cases, so trap errors.
|
||||
retcode, verifyoutput = shell('check_output', [
|
||||
"openssl",
|
||||
"verify", "-verbose",
|
||||
"-purpose", "sslserver", "-policy_check",]
|
||||
+ ([] if chaincerts.strip() == "" else ["-untrusted", "/dev/stdin"])
|
||||
+ [ssl_certificate],
|
||||
input=chaincerts.encode('ascii'),
|
||||
trap=True)
|
||||
|
||||
if "self signed" in verifyoutput:
|
||||
fingerprint = shell('check_output', [
|
||||
"openssl",
|
||||
"x509",
|
||||
"-in", ssl_certificate,
|
||||
"-noout",
|
||||
"-fingerprint"
|
||||
])
|
||||
fingerprint = re.sub(".*Fingerprint=", "", fingerprint).strip()
|
||||
|
||||
print_error("""The SSL certificate for this domain is currently self-signed. That's OK if you are willing to confirm security
|
||||
exceptions when you check your mail (either via IMAP or webmail), but if you are serving a website on this domain then users
|
||||
will not be able to access the site. When confirming security exceptions, check that the certificate fingerprint matches:""")
|
||||
print()
|
||||
print(" " + fingerprint)
|
||||
print()
|
||||
print_block("""You can purchase a signed certificate from many places. You will need to provide this Certificate Signing Request (CSR)
|
||||
to whoever you purchase the SSL certificate from:""")
|
||||
print()
|
||||
print(open(ssl_csr_path).read().strip())
|
||||
print()
|
||||
print_block("""When you purchase an SSL certificate you will receive a certificate in PEM format and possibly a file containing intermediate certificates in PEM format.
|
||||
If you receive intermediate certificates, use a text editor and paste your certificate on top and then the intermediate certificates
|
||||
below it. Save the file and place it onto this machine at %s.""" % ssl_certificate)
|
||||
|
||||
|
||||
elif retcode == 0:
|
||||
print_ok("SSL certificate is signed.")
|
||||
else:
|
||||
print_error("The SSL certificate has a problem:")
|
||||
print("")
|
||||
print(verifyoutput.strip())
|
||||
print("")
|
||||
|
||||
def print_ok(message):
|
||||
print_block(message, first_line="✓ ")
|
||||
|
||||
def print_error(message):
|
||||
print_block(message, first_line="✖ ")
|
||||
|
||||
def print_block(message, first_line=" "):
|
||||
print(first_line, end='')
|
||||
message = re.sub("\n\s*", " ", message)
|
||||
words = re.split("(\s+)", message)
|
||||
linelen = 0
|
||||
for w in words:
|
||||
if linelen + len(w) > 75:
|
||||
print()
|
||||
print(" ", end="")
|
||||
linelen = 0
|
||||
if linelen == 0 and w.strip() == "": continue
|
||||
print(w, end="")
|
||||
linelen += len(w)
|
||||
if linelen > 0:
|
||||
print()
|
||||
|
||||
if __name__ == "__main__":
|
||||
from utils import load_environment
|
||||
run_checks(load_environment())
|
||||
@@ -6,9 +6,9 @@ import os, os.path, urllib.parse, datetime, re, hashlib
|
||||
import rtyaml
|
||||
|
||||
from mailconfig import get_mail_domains
|
||||
from utils import shell, load_env_vars_from_file, safe_domain_name
|
||||
from utils import shell, load_env_vars_from_file, safe_domain_name, sort_domains
|
||||
|
||||
def get_dns_domains(env):
|
||||
def get_dns_zones(env):
|
||||
# What domains should we serve DNS for?
|
||||
domains = set()
|
||||
|
||||
@@ -32,12 +32,18 @@ def get_dns_domains(env):
|
||||
for domain in domains:
|
||||
zonefiles.append([domain, safe_domain_name(domain) + ".txt"])
|
||||
|
||||
# Sort the list so that the order is nice and so that nsd.conf has a
|
||||
# stable order so we don't rewrite the file & restart the service
|
||||
# meaninglessly.
|
||||
zone_order = sort_domains([ zone[0] for zone in zonefiles ], env)
|
||||
zonefiles.sort(key = lambda zone : zone_order.index(zone[0]) )
|
||||
|
||||
return zonefiles
|
||||
|
||||
|
||||
def do_dns_update(env):
|
||||
# What domains (and their zone filenames) should we build?
|
||||
zonefiles = get_dns_domains(env)
|
||||
zonefiles = get_dns_zones(env)
|
||||
|
||||
# Write zone files.
|
||||
os.makedirs('/etc/nsd/zones', exist_ok=True)
|
||||
@@ -322,7 +328,7 @@ server:
|
||||
|
||||
|
||||
# Append the zones.
|
||||
for domain, zonefile in sorted(zonefiles):
|
||||
for domain, zonefile in zonefiles:
|
||||
nsdconf += """
|
||||
zone:
|
||||
name: %s
|
||||
@@ -410,7 +416,7 @@ def sign_zone(domain, zonefile, env):
|
||||
########################################################################
|
||||
|
||||
def get_ds_records(env):
|
||||
zonefiles = get_dns_domains(env)
|
||||
zonefiles = get_dns_zones(env)
|
||||
ret = ""
|
||||
for domain, zonefile in zonefiles:
|
||||
fn = "/etc/nsd/zones/" + zonefile + ".ds"
|
||||
|
||||
@@ -16,6 +16,34 @@ def safe_domain_name(name):
|
||||
import urllib.parse
|
||||
return urllib.parse.quote(name, safe='')
|
||||
|
||||
def sort_domains(domain_names, env):
|
||||
# Put domain names in a nice sorted order. For web_update, PUBLIC_HOSTNAME
|
||||
# must appear first so it becomes the nginx default server.
|
||||
|
||||
# First group PUBLIC_HOSTNAME and its subdomains, then parent domains of PUBLIC_HOSTNAME, then other domains.
|
||||
groups = ( [], [], [] )
|
||||
for d in domain_names:
|
||||
if d == env['PUBLIC_HOSTNAME'] or d.endswith("." + env['PUBLIC_HOSTNAME']):
|
||||
groups[0].append(d)
|
||||
elif env['PUBLIC_HOSTNAME'].endswith("." + d):
|
||||
groups[1].append(d)
|
||||
else:
|
||||
groups[2].append(d)
|
||||
|
||||
# Within each group, sort parent domains before subdomains and after that sort lexicographically.
|
||||
def sort_group(group):
|
||||
# Find the top-most domains.
|
||||
top_domains = sorted(d for d in group if len([s for s in group if s.startswith("." + d)]) == 0)
|
||||
ret = []
|
||||
for d in top_domains:
|
||||
ret.append(d)
|
||||
ret.extend( sort_group([s for s in group if s.endswith("." + d)]) )
|
||||
return ret
|
||||
|
||||
groups = [sort_group(g) for g in groups]
|
||||
|
||||
return groups[0] + groups[1] + groups[2]
|
||||
|
||||
def exclusive_process(name):
|
||||
# Ensure that a process named `name` does not execute multiple
|
||||
# times concurrently.
|
||||
@@ -86,15 +114,33 @@ def is_pid_valid(pid):
|
||||
else:
|
||||
return True
|
||||
|
||||
def shell(method, cmd_args, env={}, capture_stderr=False, return_bytes=False):
|
||||
def shell(method, cmd_args, env={}, capture_stderr=False, return_bytes=False, trap=False, input=None):
|
||||
# A safe way to execute processes.
|
||||
# Some processes like apt-get require being given a sane PATH.
|
||||
import subprocess
|
||||
|
||||
env.update({ "PATH": "/sbin:/bin:/usr/sbin:/usr/bin" })
|
||||
stderr = None if not capture_stderr else subprocess.STDOUT
|
||||
ret = getattr(subprocess, method)(cmd_args, env=env, stderr=stderr)
|
||||
kwargs = {
|
||||
'env': env,
|
||||
'stderr': None if not capture_stderr else subprocess.STDOUT,
|
||||
}
|
||||
if method == "check_output" and input is not None:
|
||||
kwargs['input'] = input
|
||||
|
||||
if not trap:
|
||||
ret = getattr(subprocess, method)(cmd_args, **kwargs)
|
||||
else:
|
||||
try:
|
||||
ret = getattr(subprocess, method)(cmd_args, **kwargs)
|
||||
code = 0
|
||||
except subprocess.CalledProcessError as e:
|
||||
ret = e.output
|
||||
code = e.returncode
|
||||
if not return_bytes and isinstance(ret, bytes): ret = ret.decode("utf8")
|
||||
return ret
|
||||
if not trap:
|
||||
return ret
|
||||
else:
|
||||
return code, ret
|
||||
|
||||
def create_syslog_handler():
|
||||
import logging.handlers
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
import os, os.path
|
||||
|
||||
from mailconfig import get_mail_domains
|
||||
from utils import shell, safe_domain_name
|
||||
from utils import shell, safe_domain_name, sort_domains
|
||||
|
||||
def get_web_domains(env):
|
||||
# What domains should we serve HTTP/HTTPS for?
|
||||
@@ -19,7 +19,7 @@ def get_web_domains(env):
|
||||
|
||||
# Sort the list. Put PUBLIC_HOSTNAME first so it becomes the
|
||||
# default server (nginx's default_server).
|
||||
domains = sorted(domains, key = lambda domain : (domain != env["PUBLIC_HOSTNAME"], list(reversed(domain.split(".")))) )
|
||||
domains = sort_domains(domains, env)
|
||||
|
||||
return domains
|
||||
|
||||
@@ -49,6 +49,22 @@ def make_domain_config(domain, template, env):
|
||||
root = os.path.join(env["STORAGE_ROOT"], "www", safe_domain_name(test_domain))
|
||||
if os.path.exists(root): break
|
||||
|
||||
# What private key and SSL certificate will we use for this domain?
|
||||
ssl_key, ssl_certificate, csr_path = get_domain_ssl_files(domain, env)
|
||||
|
||||
# For hostnames created after the initial setup, ensure we have an SSL certificate
|
||||
# available. Make a self-signed one now if one doesn't exist.
|
||||
ensure_ssl_certificate_exists(domain, ssl_key, ssl_certificate, csr_path, env)
|
||||
|
||||
# Replace substitution strings in the template & return.
|
||||
nginx_conf = template
|
||||
nginx_conf = nginx_conf.replace("$HOSTNAME", domain)
|
||||
nginx_conf = nginx_conf.replace("$ROOT", root)
|
||||
nginx_conf = nginx_conf.replace("$SSL_KEY", ssl_key)
|
||||
nginx_conf = nginx_conf.replace("$SSL_CERTIFICATE", ssl_certificate)
|
||||
return nginx_conf
|
||||
|
||||
def get_domain_ssl_files(domain, env):
|
||||
# What SSL private key will we use? Allow the user to override this, but
|
||||
# in many cases using the same private key for all domains would be fine.
|
||||
# Don't allow the user to override the key for PUBLIC_HOSTNAME because
|
||||
@@ -59,37 +75,44 @@ def make_domain_config(domain, template, env):
|
||||
ssl_key = alt_key
|
||||
|
||||
# What SSL certificate will we use? This has to be differnet for each
|
||||
# domain name. The certificate is already generated for PUBLIC_HOSTNAME.
|
||||
# For other domains, generate a self-signed certificate if one doesn't
|
||||
# already exist. See setup/mail.sh for documentation.
|
||||
# domain name. For PUBLIC_HOSTNAME, use the one we generated at set-up
|
||||
# time.
|
||||
if domain == env['PUBLIC_HOSTNAME']:
|
||||
ssl_certificate = os.path.join(env["STORAGE_ROOT"], 'ssl/ssl_certificate.pem')
|
||||
else:
|
||||
ssl_certificate = os.path.join(env["STORAGE_ROOT"], 'ssl/domains/%s_certifiate.pem' % safe_domain_name(domain))
|
||||
os.makedirs(os.path.dirname(ssl_certificate), exist_ok=True)
|
||||
if not os.path.exists(ssl_certificate):
|
||||
# Generate a new self-signed certificate using the same private key that we already have.
|
||||
|
||||
# Start with a CSR.
|
||||
csr = os.path.join(env["STORAGE_ROOT"], 'ssl/domains/%s_cert_sign_req.csr' % safe_domain_name(domain))
|
||||
shell("check_call", [
|
||||
"openssl", "req", "-new",
|
||||
"-key", ssl_key,
|
||||
"-out", csr,
|
||||
"-subj", "/C=%s/ST=/L=/O=/CN=%s" % (env["CSR_COUNTRY"], domain)])
|
||||
# Where would the CSR go?
|
||||
csr_path = os.path.join(env["STORAGE_ROOT"], 'ssl/domains/%s_cert_sign_req.csr' % safe_domain_name(domain))
|
||||
|
||||
# And then make the certificate.
|
||||
shell("check_call", [
|
||||
"openssl", "x509", "-req",
|
||||
"-days", "365",
|
||||
"-in", csr,
|
||||
"-signkey", ssl_key,
|
||||
"-out", ssl_certificate])
|
||||
return ssl_key, ssl_certificate, csr_path
|
||||
|
||||
def ensure_ssl_certificate_exists(domain, ssl_key, ssl_certificate, csr_path, env):
|
||||
# For domains besides PUBLIC_HOSTNAME, generate a self-signed certificate if one doesn't
|
||||
# already exist. See setup/mail.sh for documentation.
|
||||
|
||||
if domain == env['PUBLIC_HOSTNAME']:
|
||||
return
|
||||
|
||||
if os.path.exists(ssl_certificate):
|
||||
return
|
||||
|
||||
os.makedirs(os.path.dirname(ssl_certificate), exist_ok=True)
|
||||
|
||||
# Generate a new self-signed certificate using the same private key that we already have.
|
||||
|
||||
# Start with a CSR.
|
||||
shell("check_call", [
|
||||
"openssl", "req", "-new",
|
||||
"-key", ssl_key,
|
||||
"-out", csr_path,
|
||||
"-subj", "/C=%s/ST=/L=/O=/CN=%s" % (env["CSR_COUNTRY"], domain)])
|
||||
|
||||
# And then make the certificate.
|
||||
shell("check_call", [
|
||||
"openssl", "x509", "-req",
|
||||
"-days", "365",
|
||||
"-in", csr_path,
|
||||
"-signkey", ssl_key,
|
||||
"-out", ssl_certificate])
|
||||
|
||||
# Replace substitution strings in the template & return.
|
||||
nginx_conf = template
|
||||
nginx_conf = nginx_conf.replace("$HOSTNAME", domain)
|
||||
nginx_conf = nginx_conf.replace("$ROOT", root)
|
||||
nginx_conf = nginx_conf.replace("$SSL_KEY", ssl_key)
|
||||
nginx_conf = nginx_conf.replace("$SSL_CERTIFICATE", ssl_certificate)
|
||||
return nginx_conf
|
||||
|
||||
Reference in New Issue
Block a user