mirror of
https://github.com/mail-in-a-box/mailinabox.git
synced 2026-03-14 17:27:23 +01:00
Merge remote-tracking branch 'upstream/main' into merge-upstream
# Conflicts: # management/status_checks.py # setup/webmail.sh
This commit is contained in:
@@ -19,6 +19,7 @@
|
||||
|
||||
import os, os.path, re, datetime, sys
|
||||
import dateutil.parser, dateutil.relativedelta, dateutil.tz
|
||||
from datetime import date
|
||||
import rtyaml
|
||||
from exclusiveprocess import Lock
|
||||
|
||||
@@ -167,6 +168,8 @@ def should_force_full(config, env):
|
||||
# since the last full backup is greater than half the size
|
||||
# of that full backup.
|
||||
inc_size = 0
|
||||
# Check if day of week is a weekend day
|
||||
weekend = date.today().weekday()>=5
|
||||
for bak in backup_status(env)["backups"]:
|
||||
if not bak["full"]:
|
||||
# Scan through the incremental backups cumulating
|
||||
@@ -175,12 +178,14 @@ def should_force_full(config, env):
|
||||
else:
|
||||
# ...until we reach the most recent full backup.
|
||||
# Return if we should to a full backup, which is based
|
||||
# on the size of the increments relative to the full
|
||||
# backup, as well as the age of the full backup.
|
||||
if inc_size > .5*bak["size"]:
|
||||
return True
|
||||
if dateutil.parser.parse(bak["date"]) + datetime.timedelta(days=config["min_age_in_days"]*10+1) < datetime.datetime.now(dateutil.tz.tzlocal()):
|
||||
return True
|
||||
# on whether it is a weekend day, the size of the
|
||||
# increments relative to the full backup, as well as
|
||||
# the age of the full backup.
|
||||
if weekend:
|
||||
if inc_size > .5*bak["size"]:
|
||||
return True
|
||||
if dateutil.parser.parse(bak["date"]) + datetime.timedelta(days=config["min_age_in_days"]*10+1) < datetime.datetime.now(dateutil.tz.tzlocal()):
|
||||
return True
|
||||
return False
|
||||
else:
|
||||
# If we got here there are no (full) backups, so make one.
|
||||
@@ -348,6 +353,7 @@ def perform_backup(full_backup):
|
||||
"--verbosity", "warning", "--no-print-statistics",
|
||||
"--archive-dir", backup_cache_dir,
|
||||
"--exclude", backup_root,
|
||||
"--exclude", os.path.join(env["STORAGE_ROOT"], "owncloud-backup"),
|
||||
"--volsize", "250",
|
||||
"--gpg-options", "'--cipher-algo=AES256'",
|
||||
"--allow-source-mismatch",
|
||||
@@ -429,6 +435,7 @@ def run_duplicity_verification():
|
||||
"--compare-data",
|
||||
"--archive-dir", backup_cache_dir,
|
||||
"--exclude", backup_root,
|
||||
"--exclude", os.path.join(env["STORAGE_ROOT"], "owncloud-backup"),
|
||||
*get_duplicity_additional_args(env),
|
||||
get_duplicity_target_url(config),
|
||||
env["STORAGE_ROOT"],
|
||||
|
||||
@@ -23,7 +23,7 @@ def get_ssl_certificates(env):
|
||||
# that the certificates are good for to the best certificate for
|
||||
# the domain.
|
||||
|
||||
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
|
||||
from cryptography.hazmat.primitives.asymmetric import dsa, rsa, ec
|
||||
from cryptography.x509 import Certificate
|
||||
|
||||
# The certificates are all stored here:
|
||||
@@ -44,6 +44,12 @@ def get_ssl_certificates(env):
|
||||
# the cert that it should be a
|
||||
# symlink to.
|
||||
continue
|
||||
if fn in ['ca','ca_certificate.pem','ca_private_key.pem']:
|
||||
# Ignore as these are for generating a temporary
|
||||
# "self-signed" certificate in a virgin setup (before
|
||||
# Let's Encrypt gives us a certificate).
|
||||
#
|
||||
continue
|
||||
fn = os.path.join(ssl_root, fn)
|
||||
if os.path.isfile(fn):
|
||||
yield fn
|
||||
@@ -68,13 +74,15 @@ def get_ssl_certificates(env):
|
||||
# Not a valid PEM format for a PEM type we care about.
|
||||
continue
|
||||
|
||||
# Is it a private key?
|
||||
if isinstance(pem, RSAPrivateKey):
|
||||
private_keys[pem.public_key().public_numbers()] = { "filename": fn, "key": pem }
|
||||
|
||||
# Is it a certificate?
|
||||
if isinstance(pem, Certificate):
|
||||
certificates.append({ "filename": fn, "cert": pem })
|
||||
# It is a private key
|
||||
elif (isinstance(pem, rsa.RSAPrivateKey)
|
||||
or isinstance(pem, dsa.DSAPrivateKey)
|
||||
or isinstance(pem, ec.EllipticCurvePrivateKey)):
|
||||
private_keys[pem.public_key().public_numbers()] = { "filename": fn, "key": pem }
|
||||
|
||||
|
||||
# Process the certificates.
|
||||
domains = { }
|
||||
@@ -518,7 +526,7 @@ def check_certificate(domain, ssl_certificate, ssl_private_key, warn_if_expiring
|
||||
# Check that the ssl_certificate & ssl_private_key files are good
|
||||
# for the provided domain.
|
||||
|
||||
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa, dsa, ec
|
||||
from cryptography.x509 import Certificate
|
||||
|
||||
# The ssl_certificate file may contain a chain of certificates. We'll
|
||||
@@ -552,7 +560,9 @@ def check_certificate(domain, ssl_certificate, ssl_private_key, warn_if_expiring
|
||||
except ValueError as e:
|
||||
return (f"The private key file {ssl_private_key} is not a private key file: {e!s}", None)
|
||||
|
||||
if not isinstance(priv_key, RSAPrivateKey):
|
||||
if (not isinstance(priv_key, rsa.RSAPrivateKey)
|
||||
and not isinstance(priv_key, dsa.DSAPrivateKey)
|
||||
and not isinstance(priv_key, ec.EllipticCurvePrivateKey)):
|
||||
return ("The private key file %s is not a private key file." % ssl_private_key, None)
|
||||
|
||||
if priv_key.public_key().public_numbers() != cert.public_key().public_numbers():
|
||||
@@ -655,7 +665,7 @@ def load_pem(pem):
|
||||
msg = "File is not a valid PEM-formatted file."
|
||||
raise ValueError(msg)
|
||||
pem_type = pem_type.group(1)
|
||||
if pem_type in {b"RSA PRIVATE KEY", b"PRIVATE KEY"}:
|
||||
if pem_type.endswith(b"PRIVATE KEY"):
|
||||
return serialization.load_pem_private_key(pem, password=None, backend=default_backend())
|
||||
if pem_type == b"CERTIFICATE":
|
||||
return load_pem_x509_certificate(pem, default_backend())
|
||||
|
||||
@@ -293,26 +293,45 @@ def run_network_checks(env, output):
|
||||
# The user might have ended up on an IP address that was previously in use
|
||||
# by a spammer, or the user may be deploying on a residential network. We
|
||||
# will not be able to reliably send mail in these cases.
|
||||
rev_ip4 = ".".join(reversed(env['PUBLIC_IP'].split('.')))
|
||||
zen = query_dns(rev_ip4+'.zen.spamhaus.org', 'A', nxdomain=None, retry = False)
|
||||
evaluate_spamhaus_lookup(env['PUBLIC_IP'], 'IPv4', rev_ip4, output, zen)
|
||||
|
||||
if not env['PUBLIC_IPV6']:
|
||||
return
|
||||
|
||||
from ipaddress import IPv6Address
|
||||
|
||||
rev_ip6 = ".".join(reversed(IPv6Address(env['PUBLIC_IPV6']).exploded.split(':')))
|
||||
zen = query_dns(rev_ip6+'.zen.spamhaus.org', 'A', nxdomain=None, retry = False)
|
||||
evaluate_spamhaus_lookup(env['PUBLIC_IPV6'], 'IPv6', rev_ip6, output, zen)
|
||||
|
||||
|
||||
def evaluate_spamhaus_lookup(lookupaddress, lookuptype, lookupdomain, output, zen):
|
||||
# See https://www.spamhaus.org/news/article/807/using-our-public-mirrors-check-your-return-codes-now. for
|
||||
# information on spamhaus return codes
|
||||
rev_ip4 = ".".join(reversed(env['PUBLIC_IP'].split('.')))
|
||||
zen = query_dns(rev_ip4+'.zen.spamhaus.org', 'A', nxdomain=None)
|
||||
if zen is None:
|
||||
output.print_ok("IP address is not blacklisted by zen.spamhaus.org.")
|
||||
output.print_ok(f"{lookuptype} address is not blacklisted by zen.spamhaus.org.")
|
||||
elif zen == "[timeout]":
|
||||
output.print_warning("Connection to zen.spamhaus.org timed out. Could not determine whether this box's IP address is blacklisted. Please try again later.")
|
||||
output.print_warning(f"""Connection to zen.spamhaus.org timed out. Could not determine whether this box's
|
||||
{lookuptype} address is blacklisted. Please try again later.""")
|
||||
elif zen == "[Not Set]":
|
||||
output.print_warning("Could not connect to zen.spamhaus.org. Could not determine whether this box's IP address is blacklisted. Please try again later.")
|
||||
output.print_warning(f"""Could not connect to zen.spamhaus.org. Could not determine whether this box's
|
||||
{lookuptype} address is blacklisted. Please try again later.""")
|
||||
elif zen == "127.255.255.252":
|
||||
output.print_warning("Incorrect spamhaus query: %s. Could not determine whether this box's IP address is blacklisted." % (rev_ip4+'.zen.spamhaus.org'))
|
||||
output.print_warning(f"""Incorrect spamhaus query: {lookupdomain + '.zen.spamhaus.org'}. Could not determine whether
|
||||
this box's {lookuptype} address is blacklisted.""")
|
||||
elif zen == "127.255.255.254":
|
||||
output.print_warning("Mail-in-a-Box is configured to use a public DNS server. This is not supported by spamhaus. Could not determine whether this box's IP address is blacklisted.")
|
||||
output.print_warning(f"""Mail-in-a-Box is configured to use a public DNS server. This is not supported by
|
||||
spamhaus. Could not determine whether this box's {lookuptype} address is blacklisted.""")
|
||||
elif zen == "127.255.255.255":
|
||||
output.print_warning("Too many queries have been performed on the spamhaus server. Could not determine whether this box's IP address is blacklisted.")
|
||||
output.print_warning(f"""Too many queries have been performed on the spamhaus server. Could not determine
|
||||
whether this box's {lookuptype} address is blacklisted.""")
|
||||
else:
|
||||
output.print_error("""The IP address of this machine {} is listed in the Spamhaus Block List (code {}),
|
||||
which may prevent recipients from receiving your email. See http://www.spamhaus.org/query/ip/{}.""".format(env['PUBLIC_IP'], zen, env['PUBLIC_IP']))
|
||||
output.print_error(f"""The {lookuptype} address of this machine {lookupaddress} is listed in the Spamhaus Block
|
||||
List (code {zen}), which may prevent recipients from receiving your email. See
|
||||
http://www.spamhaus.org/query/ip/{lookupaddress}.""")
|
||||
|
||||
|
||||
def run_domain_checks(rounded_time, env, output, pool, domains_to_check=None):
|
||||
# Get the list of domains we handle mail for.
|
||||
@@ -532,6 +551,8 @@ def check_dns_zone(domain, env, output, dns_zonefiles):
|
||||
# Check that each custom secondary nameserver resolves the IP address.
|
||||
|
||||
if custom_secondary_ns and not probably_external_dns:
|
||||
SOARecord = query_dns(domain, "SOA", at=env['PUBLIC_IP'])# Explicitly ask the local dns server.
|
||||
|
||||
for ns in custom_secondary_ns:
|
||||
# We must first resolve the nameserver to an IP address so we can query it.
|
||||
ns_ips = query_dns(ns, "A")
|
||||
@@ -541,15 +562,36 @@ def check_dns_zone(domain, env, output, dns_zonefiles):
|
||||
# Choose the first IP if nameserver returns multiple
|
||||
ns_ip = ns_ips.split('; ')[0]
|
||||
|
||||
checkSOA = True
|
||||
|
||||
# Now query it to see what it says about this domain.
|
||||
ip = query_dns(domain, "A", at=ns_ip, nxdomain=None)
|
||||
if ip == correct_ip:
|
||||
output.print_ok("Secondary nameserver %s resolved the domain correctly." % ns)
|
||||
output.print_ok(f"Secondary nameserver {ns} resolved the domain correctly.")
|
||||
elif ip is None:
|
||||
output.print_error("Secondary nameserver %s is not configured to resolve this domain." % ns)
|
||||
output.print_error(f"Secondary nameserver {ns} is not configured to resolve this domain.")
|
||||
# No need to check SOA record if not configured as nameserver
|
||||
checkSOA = False
|
||||
elif ip == '[timeout]':
|
||||
output.print_error(f"Secondary nameserver {ns} did not resolve this domain, result: {ip}")
|
||||
checkSOA = False
|
||||
else:
|
||||
output.print_error(f"Secondary nameserver {ns} is not configured correctly. (It resolved this domain as {ip}. It should be {correct_ip}.)")
|
||||
|
||||
if checkSOA:
|
||||
# Check that secondary DNS server is synchronized with our primary DNS server. Simplified by checking the SOA record which has a version number
|
||||
SOASecondary = query_dns(domain, "SOA", at=ns_ip)
|
||||
|
||||
if SOARecord == SOASecondary:
|
||||
output.print_ok(f"Secondary nameserver {ns} has consistent SOA record.")
|
||||
elif SOARecord == '[Not Set]':
|
||||
output.print_error(f"Secondary nameserver {ns} has no SOA record configured.")
|
||||
elif SOARecord == '[timeout]':
|
||||
output.print_error(f"Secondary nameserver {ns} timed out on checking SOA record.")
|
||||
else:
|
||||
output.print_error(f"""Secondary nameserver {ns} has inconsistent SOA record (primary: {SOARecord} versus secondary: {SOASecondary}).
|
||||
Check that synchronization between secondary and primary DNS servers is properly set-up.""")
|
||||
|
||||
def check_dns_zone_suggestions(domain, env, output, dns_zonefiles, domains_with_a_records):
|
||||
# Warn if a custom DNS record is preventing this or the automatic www redirect from
|
||||
# being served.
|
||||
|
||||
Reference in New Issue
Block a user