1
0
mirror of https://github.com/mail-in-a-box/mailinabox.git synced 2025-04-01 23:57:05 +00:00
This commit is contained in:
Teal Dulcet 2025-02-17 14:20:14 -08:00 committed by GitHub
commit 10ca4c4958
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 372 additions and 311 deletions

View File

@ -52,7 +52,7 @@ class AuthService:
msg = "Authorization header invalid." msg = "Authorization header invalid."
raise ValueError(msg) raise ValueError(msg)
if username.strip() == "" and password.strip() == "": if not username.strip() and not password.strip():
msg = "No email address, password, session key, or API key provided." msg = "No email address, password, session key, or API key provided."
raise ValueError(msg) raise ValueError(msg)
@ -73,7 +73,7 @@ class AuthService:
self.sessions[sessionid] = session self.sessions[sessionid] = session
# If no password was given, but a username was given, we're missing some information. # If no password was given, but a username was given, we're missing some information.
elif password.strip() == "": elif not password.strip():
msg = "Enter a password." msg = "Enter a password."
raise ValueError(msg) raise ValueError(msg)

View File

@ -14,6 +14,7 @@ import rtyaml
from exclusiveprocess import Lock from exclusiveprocess import Lock
from utils import load_environment, shell, wait_for_service from utils import load_environment, shell, wait_for_service
import operator
def backup_status(env): def backup_status(env):
# If backups are disabled, return no status. # If backups are disabled, return no status.
@ -32,14 +33,14 @@ def backup_status(env):
def reldate(date, ref, clip): def reldate(date, ref, clip):
if ref < date: return clip if ref < date: return clip
rd = dateutil.relativedelta.relativedelta(ref, date) rd = dateutil.relativedelta.relativedelta(ref, date)
if rd.years > 1: return "%d years, %d months" % (rd.years, rd.months) if rd.years > 1: return f"{rd.years:d} years, {rd.months:d} months"
if rd.years == 1: return "%d year, %d months" % (rd.years, rd.months) if rd.years == 1: return f"{rd.years:d} year, {rd.months:d} months"
if rd.months > 1: return "%d months, %d days" % (rd.months, rd.days) if rd.months > 1: return f"{rd.months:d} months, {rd.days:d} days"
if rd.months == 1: return "%d month, %d days" % (rd.months, rd.days) if rd.months == 1: return f"{rd.months:d} month, {rd.days:d} days"
if rd.days >= 7: return "%d days" % rd.days if rd.days >= 7: return f"{rd.days:d} days"
if rd.days > 1: return "%d days, %d hours" % (rd.days, rd.hours) if rd.days > 1: return f"{rd.days:d} days, {rd.hours:d} hours"
if rd.days == 1: return "%d day, %d hours" % (rd.days, rd.hours) if rd.days == 1: return f"{rd.days:d} day, {rd.hours:d} hours"
return "%d hours, %d minutes" % (rd.hours, rd.minutes) return f"{rd.hours:d} hours, {rd.minutes:d} minutes"
# Get duplicity collection status and parse for a list of backups. # Get duplicity collection status and parse for a list of backups.
def parse_line(line): def parse_line(line):
@ -91,7 +92,7 @@ def backup_status(env):
# Ensure the rows are sorted reverse chronologically. # Ensure the rows are sorted reverse chronologically.
# This is relied on by should_force_full() and the next step. # This is relied on by should_force_full() and the next step.
backups = sorted(backups.values(), key = lambda b : b["date"], reverse=True) backups = sorted(backups.values(), key = operator.itemgetter("date"), reverse=True)
# Get the average size of incremental backups, the size of the # Get the average size of incremental backups, the size of the
# most recent full backup, and the date of the most recent # most recent full backup, and the date of the most recent
@ -129,7 +130,7 @@ def backup_status(env):
# It still can't be deleted until it's old enough. # It still can't be deleted until it's old enough.
est_deleted_on = max(est_time_of_next_full, first_date + datetime.timedelta(days=config["min_age_in_days"])) est_deleted_on = max(est_time_of_next_full, first_date + datetime.timedelta(days=config["min_age_in_days"]))
deleted_in = "approx. %d days" % round((est_deleted_on-now).total_seconds()/60/60/24 + .5) deleted_in = f"approx. {round((est_deleted_on-now).total_seconds()/60/60/24 + .5):d} days"
# When will a backup be deleted? Set the deleted_in field of each backup. # When will a backup be deleted? Set the deleted_in field of each backup.
saw_full = False saw_full = False
@ -177,10 +178,8 @@ def should_force_full(config, env):
if dateutil.parser.parse(bak["date"]) + datetime.timedelta(days=config["min_age_in_days"]*10+1) < datetime.datetime.now(dateutil.tz.tzlocal()): if dateutil.parser.parse(bak["date"]) + datetime.timedelta(days=config["min_age_in_days"]*10+1) < datetime.datetime.now(dateutil.tz.tzlocal()):
return True return True
return False return False
else: # If we got here there are no (full) backups, so make one.
# If we got here there are no (full) backups, so make one. return True
# (I love for/else blocks. Here it's just to show off.)
return True
def get_passphrase(env): def get_passphrase(env):
# Get the encryption passphrase. secret_key.txt is 2048 random # Get the encryption passphrase. secret_key.txt is 2048 random
@ -192,7 +191,9 @@ def get_passphrase(env):
backup_root = os.path.join(env["STORAGE_ROOT"], 'backup') backup_root = os.path.join(env["STORAGE_ROOT"], 'backup')
with open(os.path.join(backup_root, 'secret_key.txt'), encoding="utf-8") as f: with open(os.path.join(backup_root, 'secret_key.txt'), encoding="utf-8") as f:
passphrase = f.readline().strip() passphrase = f.readline().strip()
if len(passphrase) < 43: raise Exception("secret_key.txt's first line is too short!") if len(passphrase) < 43:
msg = "secret_key.txt's first line is too short!"
raise Exception(msg)
return passphrase return passphrase
@ -236,7 +237,7 @@ def get_duplicity_additional_args(env):
f"--ssh-options='-i /root/.ssh/id_rsa_miab -p {port}'", f"--ssh-options='-i /root/.ssh/id_rsa_miab -p {port}'",
f"--rsync-options='-e \"/usr/bin/ssh -oStrictHostKeyChecking=no -oBatchMode=yes -p {port} -i /root/.ssh/id_rsa_miab\"'", f"--rsync-options='-e \"/usr/bin/ssh -oStrictHostKeyChecking=no -oBatchMode=yes -p {port} -i /root/.ssh/id_rsa_miab\"'",
] ]
elif get_target_type(config) == 's3': if get_target_type(config) == 's3':
# See note about hostname in get_duplicity_target_url. # See note about hostname in get_duplicity_target_url.
# The region name, which is required by some non-AWS endpoints, # The region name, which is required by some non-AWS endpoints,
# is saved inside the username portion of the URL. # is saved inside the username portion of the URL.
@ -346,7 +347,7 @@ def perform_backup(full_backup):
shell('check_call', [ shell('check_call', [
"/usr/bin/duplicity", "/usr/bin/duplicity",
"remove-older-than", "remove-older-than",
"%dD" % config["min_age_in_days"], "{:d}D".format(config["min_age_in_days"]),
"--verbosity", "error", "--verbosity", "error",
"--archive-dir", backup_cache_dir, "--archive-dir", backup_cache_dir,
"--force", "--force",
@ -447,7 +448,7 @@ def list_target_files(config):
if target.scheme == "file": if target.scheme == "file":
return [(fn, os.path.getsize(os.path.join(target.path, fn))) for fn in os.listdir(target.path)] return [(fn, os.path.getsize(os.path.join(target.path, fn))) for fn in os.listdir(target.path)]
elif target.scheme == "rsync": if target.scheme == "rsync":
rsync_fn_size_re = re.compile(r'.* ([^ ]*) [^ ]* [^ ]* (.*)') rsync_fn_size_re = re.compile(r'.* ([^ ]*) [^ ]* [^ ]* (.*)')
rsync_target = '{host}:{path}' rsync_target = '{host}:{path}'
@ -463,9 +464,8 @@ def list_target_files(config):
target_path = target.path target_path = target.path
if not target_path.endswith('/'): if not target_path.endswith('/'):
target_path = target_path + '/' target_path += "/"
if target_path.startswith('/'): target_path = target_path.removeprefix('/')
target_path = target_path[1:]
rsync_command = [ 'rsync', rsync_command = [ 'rsync',
'-e', '-e',
@ -485,23 +485,22 @@ def list_target_files(config):
if match: if match:
ret.append( (match.groups()[1], int(match.groups()[0].replace(',',''))) ) ret.append( (match.groups()[1], int(match.groups()[0].replace(',',''))) )
return ret return ret
if 'Permission denied (publickey).' in listing:
reason = "Invalid user or check you correctly copied the SSH key."
elif 'No such file or directory' in listing:
reason = f"Provided path {target_path} is invalid."
elif 'Network is unreachable' in listing:
reason = f"The IP address {target.hostname} is unreachable."
elif 'Could not resolve hostname' in listing:
reason = f"The hostname {target.hostname} cannot be resolved."
else: else:
if 'Permission denied (publickey).' in listing: reason = ("Unknown error."
reason = "Invalid user or check you correctly copied the SSH key." "Please check running 'management/backup.py --verify'"
elif 'No such file or directory' in listing: "from mailinabox sources to debug the issue.")
reason = f"Provided path {target_path} is invalid." msg = f"Connection to rsync host failed: {reason}"
elif 'Network is unreachable' in listing: raise ValueError(msg)
reason = f"The IP address {target.hostname} is unreachable."
elif 'Could not resolve hostname' in listing:
reason = f"The hostname {target.hostname} cannot be resolved."
else:
reason = ("Unknown error."
"Please check running 'management/backup.py --verify'"
"from mailinabox sources to debug the issue.")
msg = f"Connection to rsync host failed: {reason}"
raise ValueError(msg)
elif target.scheme == "s3": if target.scheme == "s3":
import boto3.s3 import boto3.s3
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
@ -513,7 +512,7 @@ def list_target_files(config):
if path == '/': if path == '/':
path = '' path = ''
if bucket == "": if not bucket:
msg = "Enter an S3 bucket name." msg = "Enter an S3 bucket name."
raise ValueError(msg) raise ValueError(msg)
@ -531,7 +530,7 @@ def list_target_files(config):
except ClientError as e: except ClientError as e:
raise ValueError(e) raise ValueError(e)
return backup_list return backup_list
elif target.scheme == 'b2': if target.scheme == 'b2':
from b2sdk.v1 import InMemoryAccountInfo, B2Api from b2sdk.v1 import InMemoryAccountInfo, B2Api
from b2sdk.v1.exception import NonExistentBucket from b2sdk.v1.exception import NonExistentBucket
info = InMemoryAccountInfo() info = InMemoryAccountInfo()
@ -550,8 +549,7 @@ def list_target_files(config):
raise ValueError(msg) raise ValueError(msg)
return [(key.file_name, key.size) for key, _ in bucket.ls()] return [(key.file_name, key.size) for key, _ in bucket.ls()]
else: raise ValueError(config["target"])
raise ValueError(config["target"])
def backup_set_custom(env, target, target_user, target_pass, min_age): def backup_set_custom(env, target, target_user, target_pass, min_age):
@ -605,8 +603,7 @@ def get_backup_config(env, for_save=False, for_ui=False):
# authentication details. The user will have to re-enter it. # authentication details. The user will have to re-enter it.
if for_ui: if for_ui:
for field in ("target_user", "target_pass"): for field in ("target_user", "target_pass"):
if field in config: config.pop(field, None)
del config[field]
# helper fields for the admin # helper fields for the admin
config["file_target_directory"] = os.path.join(backup_root, 'encrypted') config["file_target_directory"] = os.path.join(backup_root, 'encrypted')

View File

@ -38,7 +38,7 @@ with contextlib.suppress(OSError):
csr_country_codes = [] csr_country_codes = []
with open(os.path.join(os.path.dirname(me), "csr_country_codes.tsv"), encoding="utf-8") as f: with open(os.path.join(os.path.dirname(me), "csr_country_codes.tsv"), encoding="utf-8") as f:
for line in f: for line in f:
if line.strip() == "" or line.startswith("#"): continue if not line.strip() or line.startswith("#"): continue
code, name = line.strip().split("\t")[0:2] code, name = line.strip().split("\t")[0:2]
csr_country_codes.append((code, name)) csr_country_codes.append((code, name))
@ -93,12 +93,11 @@ def authorized_personnel_only(viewfunc):
if request.headers.get('Accept') in {None, "", "*/*"}: if request.headers.get('Accept') in {None, "", "*/*"}:
# Return plain text output. # Return plain text output.
return Response(error+"\n", status=status, mimetype='text/plain', headers=headers) return Response(error+"\n", status=status, mimetype='text/plain', headers=headers)
else: # Return JSON output.
# Return JSON output. return Response(json.dumps({
return Response(json.dumps({ "status": "error",
"status": "error", "reason": error,
"reason": error, })+"\n", status=status, mimetype='application/json', headers=headers)
})+"\n", status=status, mimetype='application/json', headers=headers)
return newview return newview
@ -148,13 +147,12 @@ def login():
"status": "missing-totp-token", "status": "missing-totp-token",
"reason": str(e), "reason": str(e),
}) })
else: # Log the failed login
# Log the failed login log_failed_login(request)
log_failed_login(request) return json_response({
return json_response({ "status": "invalid",
"status": "invalid", "reason": str(e),
"reason": str(e), })
})
# Return a new session for the user. # Return a new session for the user.
resp = { resp = {
@ -164,7 +162,7 @@ def login():
"api_key": auth_service.create_session_key(email, env, type='login'), "api_key": auth_service.create_session_key(email, env, type='login'),
} }
app.logger.info(f"New login session created for {email}") app.logger.info("New login session created for %s", email)
# Return. # Return.
return json_response(resp) return json_response(resp)
@ -173,7 +171,7 @@ def login():
def logout(): def logout():
try: try:
email, _ = auth_service.authenticate(request, env, logout=True) email, _ = auth_service.authenticate(request, env, logout=True)
app.logger.info(f"{email} logged out") app.logger.info("%s logged out", email)
except ValueError: except ValueError:
pass pass
finally: finally:
@ -186,8 +184,7 @@ def logout():
def mail_users(): def mail_users():
if request.args.get("format", "") == "json": if request.args.get("format", "") == "json":
return json_response(get_mail_users_ex(env, with_archived=True)) return json_response(get_mail_users_ex(env, with_archived=True))
else: return "".join(x+"\n" for x in get_mail_users(env))
return "".join(x+"\n" for x in get_mail_users(env))
@app.route('/mail/users/add', methods=['POST']) @app.route('/mail/users/add', methods=['POST'])
@authorized_personnel_only @authorized_personnel_only
@ -257,8 +254,7 @@ def mail_user_privs_remove():
def mail_aliases(): def mail_aliases():
if request.args.get("format", "") == "json": if request.args.get("format", "") == "json":
return json_response(get_mail_aliases_ex(env)) return json_response(get_mail_aliases_ex(env))
else: return "".join(address+"\t"+receivers+"\t"+(senders or "")+"\n" for address, receivers, senders, auto in get_mail_aliases(env))
return "".join(address+"\t"+receivers+"\t"+(senders or "")+"\n" for address, receivers, senders, auto in get_mail_aliases(env))
@app.route('/mail/aliases/add', methods=['POST']) @app.route('/mail/aliases/add', methods=['POST'])
@authorized_personnel_only @authorized_personnel_only
@ -309,7 +305,7 @@ def dns_get_secondary_nameserver():
def dns_set_secondary_nameserver(): def dns_set_secondary_nameserver():
from dns_update import set_secondary_dns from dns_update import set_secondary_dns
try: try:
return set_secondary_dns([ns.strip() for ns in re.split(r"[, ]+", request.form.get('hostnames') or "") if ns.strip() != ""], env) return set_secondary_dns([ns.strip() for ns in re.split(r"[, ]+", request.form.get('hostnames') or "") if ns.strip()], env)
except ValueError as e: except ValueError as e:
return (str(e), 400) return (str(e), 400)
@ -378,13 +374,13 @@ def dns_set_record(qname, rtype="A"):
# Get the existing records matching the qname and rtype. # Get the existing records matching the qname and rtype.
return dns_get_records(qname, rtype) return dns_get_records(qname, rtype)
elif request.method in {"POST", "PUT"}: if request.method in {"POST", "PUT"}:
# There is a default value for A/AAAA records. # There is a default value for A/AAAA records.
if rtype in {"A", "AAAA"} and value == "": if rtype in {"A", "AAAA"} and not value:
value = request.environ.get("HTTP_X_FORWARDED_FOR") # normally REMOTE_ADDR but we're behind nginx as a reverse proxy value = request.environ.get("HTTP_X_FORWARDED_FOR") # normally REMOTE_ADDR but we're behind nginx as a reverse proxy
# Cannot add empty records. # Cannot add empty records.
if value == '': if not value:
return ("No value for the record provided.", 400) return ("No value for the record provided.", 400)
if request.method == "POST": if request.method == "POST":
@ -398,7 +394,7 @@ def dns_set_record(qname, rtype="A"):
action = "set" action = "set"
elif request.method == "DELETE": elif request.method == "DELETE":
if value == '': if not value:
# Delete all records for this qname-type pair. # Delete all records for this qname-type pair.
value = None value = None
else: else:
@ -408,11 +404,12 @@ def dns_set_record(qname, rtype="A"):
if set_custom_dns_record(qname, rtype, value, action, env): if set_custom_dns_record(qname, rtype, value, action, env):
return do_dns_update(env) or "Something isn't right." return do_dns_update(env) or "Something isn't right."
return "OK"
except ValueError as e: except ValueError as e:
return (str(e), 400) return (str(e), 400)
return "OK"
@app.route('/dns/dump') @app.route('/dns/dump')
@authorized_personnel_only @authorized_personnel_only
def dns_get_dump(): def dns_get_dump():
@ -536,8 +533,8 @@ def totp_post_disable():
return (str(e), 400) return (str(e), 400)
if result: # success if result: # success
return "OK" return "OK"
else: # error # error
return ("Invalid user or MFA id.", 400) return ("Invalid user or MFA id.", 400)
# WEB # WEB
@ -621,8 +618,7 @@ def needs_reboot():
from status_checks import is_reboot_needed_due_to_package_installation from status_checks import is_reboot_needed_due_to_package_installation
if is_reboot_needed_due_to_package_installation(): if is_reboot_needed_due_to_package_installation():
return json_response(True) return json_response(True)
else: return json_response(False)
return json_response(False)
@app.route('/system/reboot', methods=["POST"]) @app.route('/system/reboot', methods=["POST"])
@authorized_personnel_only @authorized_personnel_only
@ -631,8 +627,7 @@ def do_reboot():
from status_checks import is_reboot_needed_due_to_package_installation from status_checks import is_reboot_needed_due_to_package_installation
if is_reboot_needed_due_to_package_installation(): if is_reboot_needed_due_to_package_installation():
return utils.shell("check_output", ["/sbin/shutdown", "-r", "now"], capture_stderr=True) return utils.shell("check_output", ["/sbin/shutdown", "-r", "now"], capture_stderr=True)
else: return "No reboot is required, so it is not allowed."
return "No reboot is required, so it is not allowed."
@app.route('/system/backup/status') @app.route('/system/backup/status')
@ -694,8 +689,7 @@ def check_request_cookie_for_admin_access():
if not session: return False if not session: return False
privs = get_mail_user_privileges(session["email"], env) privs = get_mail_user_privileges(session["email"], env)
if not isinstance(privs, list): return False if not isinstance(privs, list): return False
if "admin" not in privs: return False return "admin" in privs
return True
def authorized_personnel_only_via_cookie(f): def authorized_personnel_only_via_cookie(f):
@wraps(f) @wraps(f)
@ -709,7 +703,7 @@ def authorized_personnel_only_via_cookie(f):
@authorized_personnel_only_via_cookie @authorized_personnel_only_via_cookie
def munin_static_file(filename=""): def munin_static_file(filename=""):
# Proxy the request to static files. # Proxy the request to static files.
if filename == "": filename = "index.html" if not filename: filename = "index.html"
return send_from_directory("/var/cache/munin/www", filename) return send_from_directory("/var/cache/munin/www", filename)
@app.route('/munin/cgi-graph/<path:filename>') @app.route('/munin/cgi-graph/<path:filename>')
@ -738,12 +732,12 @@ def munin_cgi(filename):
# -c "/usr/lib/munin/cgi/munin-cgi-graph" passes the command to run as munin # -c "/usr/lib/munin/cgi/munin-cgi-graph" passes the command to run as munin
# "%s" is a placeholder for where the request's querystring will be added # "%s" is a placeholder for where the request's querystring will be added
if filename == "": if not filename:
return ("a path must be specified", 404) return ("a path must be specified", 404)
query_str = request.query_string.decode("utf-8", 'ignore') query_str = request.query_string.decode("utf-8", 'ignore')
env = {'PATH_INFO': '/%s/' % filename, 'REQUEST_METHOD': 'GET', 'QUERY_STRING': query_str} env = {'PATH_INFO': f'/{filename}/', 'REQUEST_METHOD': 'GET', 'QUERY_STRING': query_str}
code, binout = utils.shell('check_output', code, binout = utils.shell('check_output',
COMMAND.split(" ", 5), COMMAND.split(" ", 5),
# Using a maxsplit of 5 keeps the last arguments together # Using a maxsplit of 5 keeps the last arguments together
@ -777,7 +771,7 @@ def log_failed_login(request):
# We need to add a timestamp to the log message, otherwise /dev/log will eat the "duplicate" # We need to add a timestamp to the log message, otherwise /dev/log will eat the "duplicate"
# message. # message.
app.logger.warning( f"Mail-in-a-Box Management Daemon: Failed login attempt from ip {ip} - timestamp {time.time()}") app.logger.warning("Mail-in-a-Box Management Daemon: Failed login attempt from ip %s - timestamp %s", ip, time.time())
# APP # APP

View File

@ -11,7 +11,6 @@ import dns.resolver
from utils import shell, load_env_vars_from_file, safe_domain_name, sort_domains, get_ssh_port from utils import shell, load_env_vars_from_file, safe_domain_name, sort_domains, get_ssh_port
from ssl_certificates import get_ssl_certificates, check_certificate from ssl_certificates import get_ssl_certificates, check_certificate
import contextlib
# From https://stackoverflow.com/questions/3026957/how-to-validate-a-domain-name-using-regex-php/16491074#16491074 # From https://stackoverflow.com/questions/3026957/how-to-validate-a-domain-name-using-regex-php/16491074#16491074
# This regular expression matches domain names according to RFCs, it also accepts fqdn with an leading dot, # This regular expression matches domain names according to RFCs, it also accepts fqdn with an leading dot,
@ -124,8 +123,7 @@ def do_dns_update(env, force=False):
if len(updated_domains) == 0: if len(updated_domains) == 0:
# if nothing was updated (except maybe OpenDKIM's files), don't show any output # if nothing was updated (except maybe OpenDKIM's files), don't show any output
return "" return ""
else: return "updated DNS: " + ",".join(updated_domains) + "\n"
return "updated DNS: " + ",".join(updated_domains) + "\n"
######################################################################## ########################################################################
@ -187,7 +185,7 @@ def build_zone(domain, domain_properties, additional_records, env, is_zone=True)
# is managed outside of the box. # is managed outside of the box.
if is_zone: if is_zone:
# Obligatory NS record to ns1.PRIMARY_HOSTNAME. # Obligatory NS record to ns1.PRIMARY_HOSTNAME.
records.append((None, "NS", "ns1.%s." % env["PRIMARY_HOSTNAME"], False)) records.append((None, "NS", "ns1.{}.".format(env["PRIMARY_HOSTNAME"]), False))
# NS record to ns2.PRIMARY_HOSTNAME or whatever the user overrides. # NS record to ns2.PRIMARY_HOSTNAME or whatever the user overrides.
# User may provide one or more additional nameservers # User may provide one or more additional nameservers
@ -254,19 +252,19 @@ def build_zone(domain, domain_properties, additional_records, env, is_zone=True)
# was set. So set has_rec_base to a clone of the current set of DNS settings, and don't update # was set. So set has_rec_base to a clone of the current set of DNS settings, and don't update
# during this process. # during this process.
has_rec_base = list(records) has_rec_base = list(records)
a_expl = "Required. May have a different value. Sets the IP address that %s resolves to for web hosting and other services besides mail. The A record must be present but its value does not affect mail delivery." % domain a_expl = f"Required. May have a different value. Sets the IP address that {domain} resolves to for web hosting and other services besides mail. The A record must be present but its value does not affect mail delivery."
if domain_properties[domain]["auto"]: if domain_properties[domain]["auto"]:
if domain.startswith(("ns1.", "ns2.")): a_expl = False # omit from 'External DNS' page since this only applies if box is its own DNS server if domain.startswith(("ns1.", "ns2.")): a_expl = False # omit from 'External DNS' page since this only applies if box is its own DNS server
if domain.startswith("www."): a_expl = "Optional. Sets the IP address that %s resolves to so that the box can provide a redirect to the parent domain." % domain if domain.startswith("www."): a_expl = f"Optional. Sets the IP address that {domain} resolves to so that the box can provide a redirect to the parent domain."
if domain.startswith("mta-sts."): a_expl = "Optional. MTA-STS Policy Host serving /.well-known/mta-sts.txt." if domain.startswith("mta-sts."): a_expl = "Optional. MTA-STS Policy Host serving /.well-known/mta-sts.txt."
if domain.startswith("autoconfig."): a_expl = "Provides email configuration autodiscovery support for Thunderbird Autoconfig." if domain.startswith("autoconfig."): a_expl = "Provides email configuration autodiscovery support for Thunderbird Autoconfig."
if domain.startswith("autodiscover."): a_expl = "Provides email configuration autodiscovery support for Z-Push ActiveSync Autodiscover." if domain.startswith("autodiscover."): a_expl = "Provides email configuration autodiscovery support for Z-Push ActiveSync Autodiscover."
defaults = [ defaults = [
(None, "A", env["PUBLIC_IP"], a_expl), (None, "A", env["PUBLIC_IP"], a_expl),
(None, "AAAA", env.get('PUBLIC_IPV6'), "Optional. Sets the IPv6 address that %s resolves to, e.g. for web hosting. (It is not necessary for receiving mail on this domain.)" % domain), (None, "AAAA", env.get('PUBLIC_IPV6'), f"Optional. Sets the IPv6 address that {domain} resolves to, e.g. for web hosting. (It is not necessary for receiving mail on this domain.)"),
] ]
for qname, rtype, value, explanation in defaults: for qname, rtype, value, explanation in defaults:
if value is None or value.strip() == "": continue # skip IPV6 if not set if value is None or not value.strip(): continue # skip IPV6 if not set
if not is_zone and qname == "www": continue # don't create any default 'www' subdomains on what are themselves subdomains if not is_zone and qname == "www": continue # don't create any default 'www' subdomains on what are themselves subdomains
# Set the default record, but not if: # Set the default record, but not if:
# (1) there is not a user-set record of the same type already # (1) there is not a user-set record of the same type already
@ -281,13 +279,13 @@ def build_zone(domain, domain_properties, additional_records, env, is_zone=True)
if domain_properties[domain]["mail"]: if domain_properties[domain]["mail"]:
# The MX record says where email for the domain should be delivered: Here! # The MX record says where email for the domain should be delivered: Here!
if not has_rec(None, "MX", prefix="10 "): if not has_rec(None, "MX", prefix="10 "):
records.append((None, "MX", "10 %s." % env["PRIMARY_HOSTNAME"], "Required. Specifies the hostname (and priority) of the machine that handles @%s mail." % domain)) records.append((None, "MX", "10 {}.".format(env["PRIMARY_HOSTNAME"]), f"Required. Specifies the hostname (and priority) of the machine that handles @{domain} mail."))
# SPF record: Permit the box ('mx', see above) to send mail on behalf of # SPF record: Permit the box ('mx', see above) to send mail on behalf of
# the domain, and no one else. # the domain, and no one else.
# Skip if the user has set a custom SPF record. # Skip if the user has set a custom SPF record.
if not has_rec(None, "TXT", prefix="v=spf1 "): if not has_rec(None, "TXT", prefix="v=spf1 "):
records.append((None, "TXT", 'v=spf1 mx -all', "Recommended. Specifies that only the box is permitted to send @%s mail." % domain)) records.append((None, "TXT", 'v=spf1 mx -all', f"Recommended. Specifies that only the box is permitted to send @{domain} mail."))
# Append the DKIM TXT record to the zone as generated by OpenDKIM. # Append the DKIM TXT record to the zone as generated by OpenDKIM.
# Skip if the user has set a DKIM record already. # Skip if the user has set a DKIM record already.
@ -296,12 +294,12 @@ def build_zone(domain, domain_properties, additional_records, env, is_zone=True)
m = re.match(r'(\S+)\s+IN\s+TXT\s+\( ((?:"[^"]+"\s+)+)\)', orf.read(), re.S) m = re.match(r'(\S+)\s+IN\s+TXT\s+\( ((?:"[^"]+"\s+)+)\)', orf.read(), re.S)
val = "".join(re.findall(r'"([^"]+)"', m.group(2))) val = "".join(re.findall(r'"([^"]+)"', m.group(2)))
if not has_rec(m.group(1), "TXT", prefix="v=DKIM1; "): if not has_rec(m.group(1), "TXT", prefix="v=DKIM1; "):
records.append((m.group(1), "TXT", val, "Recommended. Provides a way for recipients to verify that this machine sent @%s mail." % domain)) records.append((m.group(1), "TXT", val, f"Recommended. Provides a way for recipients to verify that this machine sent @{domain} mail."))
# Append a DMARC record. # Append a DMARC record.
# Skip if the user has set a DMARC record already. # Skip if the user has set a DMARC record already.
if not has_rec("_dmarc", "TXT", prefix="v=DMARC1; "): if not has_rec("_dmarc", "TXT", prefix="v=DMARC1; "):
records.append(("_dmarc", "TXT", 'v=DMARC1; p=quarantine;', "Recommended. Specifies that mail that does not originate from the box but claims to be from @%s or which does not have a valid DKIM signature is suspect and should be quarantined by the recipient's mail system." % domain)) records.append(("_dmarc", "TXT", 'v=DMARC1; p=quarantine;', f"Recommended. Specifies that mail that does not originate from the box but claims to be from @{domain} or which does not have a valid DKIM signature is suspect and should be quarantined by the recipient's mail system."))
if domain_properties[domain]["user"]: if domain_properties[domain]["user"]:
# Add CardDAV/CalDAV SRV records on the non-primary hostname that points to the primary hostname # Add CardDAV/CalDAV SRV records on the non-primary hostname that points to the primary hostname
@ -364,9 +362,9 @@ def build_zone(domain, domain_properties, additional_records, env, is_zone=True)
# Mark this domain as not sending mail with hard-fail SPF and DMARC records. # Mark this domain as not sending mail with hard-fail SPF and DMARC records.
d = (qname+"." if qname else "") + domain d = (qname+"." if qname else "") + domain
if not has_rec(qname, "TXT", prefix="v=spf1 "): if not has_rec(qname, "TXT", prefix="v=spf1 "):
records.append((qname, "TXT", 'v=spf1 -all', "Recommended. Prevents use of this domain name for outbound mail by specifying that no servers are valid sources for mail from @%s. If you do send email from this domain name you should either override this record such that the SPF rule does allow the originating server, or, take the recommended approach and have the box handle mail for this domain (simply add any receiving alias at this domain name to make this machine treat the domain name as one of its mail domains)." % d)) records.append((qname, "TXT", 'v=spf1 -all', f"Recommended. Prevents use of this domain name for outbound mail by specifying that no servers are valid sources for mail from @{d}. If you do send email from this domain name you should either override this record such that the SPF rule does allow the originating server, or, take the recommended approach and have the box handle mail for this domain (simply add any receiving alias at this domain name to make this machine treat the domain name as one of its mail domains)."))
if not has_rec("_dmarc" + ("."+qname if qname else ""), "TXT", prefix="v=DMARC1; "): if not has_rec("_dmarc" + ("."+qname if qname else ""), "TXT", prefix="v=DMARC1; "):
records.append(("_dmarc" + ("."+qname if qname else ""), "TXT", 'v=DMARC1; p=reject;', "Recommended. Prevents use of this domain name for outbound mail by specifying that the SPF rule should be honoured for mail from @%s." % d)) records.append(("_dmarc" + ("."+qname if qname else ""), "TXT", 'v=DMARC1; p=reject;', f"Recommended. Prevents use of this domain name for outbound mail by specifying that the SPF rule should be honoured for mail from @{d}."))
# And with a null MX record (https://explained-from-first-principles.com/email/#null-mx-record) # And with a null MX record (https://explained-from-first-principles.com/email/#null-mx-record)
if not has_rec(qname, "MX"): if not has_rec(qname, "MX"):
@ -458,14 +456,11 @@ def build_sshfp_records():
keys = sorted(keys.split("\n")) keys = sorted(keys.split("\n"))
for key in keys: for key in keys:
if key.strip() == "" or key[0] == "#": continue if not key.strip() or key[0] == "#": continue
try: try:
_host, keytype, pubkey = key.split(" ") _host, keytype, pubkey = key.split(" ")
yield "%d %d ( %s )" % ( yield f"{algorithm_number[keytype]:d} {2 # specifies we are using SHA-256 on next line
algorithm_number[keytype], :d} ( {hashlib.sha256(base64.b64decode(pubkey)).hexdigest().upper()} )"
2, # specifies we are using SHA-256 on next line
hashlib.sha256(base64.b64decode(pubkey)).hexdigest().upper(),
)
except: except:
# Lots of things can go wrong. Don't let it disturb the DNS # Lots of things can go wrong. Don't let it disturb the DNS
# zone. # zone.
@ -592,7 +587,8 @@ def get_dns_zonefile(zone, env):
if zone == domain: if zone == domain:
break break
else: else:
raise ValueError("%s is not a domain name that corresponds to a zone." % zone) msg = f"{zone} is not a domain name that corresponds to a zone."
raise ValueError(msg)
nsd_zonefile = "/etc/nsd/zones/" + fn nsd_zonefile = "/etc/nsd/zones/" + fn
with open(nsd_zonefile, encoding="utf-8") as f: with open(nsd_zonefile, encoding="utf-8") as f:
@ -617,8 +613,8 @@ zone:
# and, if not a subnet, notifies to them. # and, if not a subnet, notifies to them.
for ipaddr in get_secondary_dns(additional_records, mode="xfr"): for ipaddr in get_secondary_dns(additional_records, mode="xfr"):
if "/" not in ipaddr: if "/" not in ipaddr:
nsdconf += "\n\tnotify: %s NOKEY" % (ipaddr) nsdconf += f"\n\tnotify: {ipaddr} NOKEY"
nsdconf += "\n\tprovide-xfr: %s NOKEY\n" % (ipaddr) nsdconf += f"\n\tprovide-xfr: {ipaddr} NOKEY\n"
# Check if the file is changing. If it isn't changing, # Check if the file is changing. If it isn't changing,
# return False to flag that no change was made. # return False to flag that no change was made.
@ -717,9 +713,9 @@ def sign_zone(domain, zonefile, env):
# zonefile to sign # zonefile to sign
"/etc/nsd/zones/" + zonefile, "/etc/nsd/zones/" + zonefile,
]
# keys to sign with (order doesn't matter -- it'll figure it out) # keys to sign with (order doesn't matter -- it'll figure it out)
+ all_keys *all_keys
]
) )
# Create a DS record based on the patched-up key files. The DS record is specific to the # Create a DS record based on the patched-up key files. The DS record is specific to the
@ -898,7 +894,8 @@ def set_custom_dns_record(qname, rtype, value, action, env):
else: else:
# No match. # No match.
if qname != "_secondary_nameserver": if qname != "_secondary_nameserver":
raise ValueError("%s is not a domain name or a subdomain of a domain name managed by this box." % qname) msg = f"{qname} is not a domain name or a subdomain of a domain name managed by this box."
raise ValueError(msg)
# validate rtype # validate rtype
rtype = rtype.upper() rtype = rtype.upper()
@ -910,8 +907,12 @@ def set_custom_dns_record(qname, rtype, value, action, env):
if rtype in {"A", "AAAA"}: if rtype in {"A", "AAAA"}:
if value != "local": # "local" is a special flag for us if value != "local": # "local" is a special flag for us
v = ipaddress.ip_address(value) # raises a ValueError if there's a problem v = ipaddress.ip_address(value) # raises a ValueError if there's a problem
if rtype == "A" and not isinstance(v, ipaddress.IPv4Address): raise ValueError("That's an IPv6 address.") if rtype == "A" and not isinstance(v, ipaddress.IPv4Address):
if rtype == "AAAA" and not isinstance(v, ipaddress.IPv6Address): raise ValueError("That's an IPv4 address.") msg = "That's an IPv6 address."
raise ValueError(msg)
if rtype == "AAAA" and not isinstance(v, ipaddress.IPv6Address):
msg = "That's an IPv4 address."
raise ValueError(msg)
elif rtype in {"CNAME", "NS"}: elif rtype in {"CNAME", "NS"}:
if rtype == "NS" and qname == zone: if rtype == "NS" and qname == zone:
msg = "NS records can only be set for subdomains." msg = "NS records can only be set for subdomains."
@ -919,7 +920,7 @@ def set_custom_dns_record(qname, rtype, value, action, env):
# ensure value has a trailing dot # ensure value has a trailing dot
if not value.endswith("."): if not value.endswith("."):
value = value + "." value += "."
if not re.search(DOMAIN_RE, value): if not re.search(DOMAIN_RE, value):
msg = "Invalid value." msg = "Invalid value."
@ -928,7 +929,8 @@ def set_custom_dns_record(qname, rtype, value, action, env):
# anything goes # anything goes
pass pass
else: else:
raise ValueError("Unknown record type '%s'." % rtype) msg = f"Unknown record type '{rtype}'."
raise ValueError(msg)
# load existing config # load existing config
config = list(get_custom_dns_config(env)) config = list(get_custom_dns_config(env))
@ -1039,7 +1041,8 @@ def set_secondary_dns(hostnames, env):
try: try:
resolver.resolve(item, "AAAA") resolver.resolve(item, "AAAA")
except (dns.resolver.NoNameservers, dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.Timeout): except (dns.resolver.NoNameservers, dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.Timeout):
raise ValueError("Could not resolve the IP address of %s." % item) msg = f"Could not resolve the IP address of {item}."
raise ValueError(msg)
else: else:
# Validate IP address. # Validate IP address.
try: try:
@ -1048,7 +1051,8 @@ def set_secondary_dns(hostnames, env):
else: else:
ipaddress.ip_address(item[4:]) # raises a ValueError if there's a problem ipaddress.ip_address(item[4:]) # raises a ValueError if there's a problem
except ValueError: except ValueError:
raise ValueError("'%s' is not an IPv4 or IPv6 address or subnet." % item[4:]) msg = f"'{item[4:]}' is not an IPv4 or IPv6 address or subnet."
raise ValueError(msg)
# Set. # Set.
set_custom_dns_record("_secondary_nameserver", "A", " ".join(hostnames), "set", env) set_custom_dns_record("_secondary_nameserver", "A", " ".join(hostnames), "set", env)

View File

@ -28,7 +28,7 @@ admin_addr = "administrator@" + env['PRIMARY_HOSTNAME']
content = sys.stdin.read().strip() content = sys.stdin.read().strip()
# If there's nothing coming in, just exit. # If there's nothing coming in, just exit.
if content == "": if not content:
sys.exit(0) sys.exit(0)
# create MIME message # create MIME message

View File

@ -71,9 +71,8 @@ def scan_files(collector):
if not os.path.exists(fn): if not os.path.exists(fn):
continue continue
elif fn[-3:] == '.gz': if fn[-3:] == '.gz':
tmp_file = tempfile.NamedTemporaryFile() with tempfile.NamedTemporaryFile(delete=False) as tmp_file, gzip.open(fn, 'rb') as f:
with gzip.open(fn, 'rb') as f:
shutil.copyfileobj(f, tmp_file) shutil.copyfileobj(f, tmp_file)
if VERBOSE: if VERBOSE:
@ -88,6 +87,8 @@ def scan_files(collector):
else: else:
stop_scan = False stop_scan = False
if tmp_file is not None:
os.remove(tmp_file.name)
def scan_mail_log(env): def scan_mail_log(env):
@ -302,7 +303,7 @@ def scan_mail_log(env):
for date, sender, message in user_data["blocked"]: for date, sender, message in user_data["blocked"]:
if len(sender) > 64: if len(sender) > 64:
sender = sender[:32] + "" + sender[-32:] sender = sender[:32] + "" + sender[-32:]
user_rejects.extend((f'{date} - {sender} ', ' %s' % message)) user_rejects.extend((f'{date} - {sender} ', f' {message}'))
rejects.append(user_rejects) rejects.append(user_rejects)
print_user_table( print_user_table(
@ -355,7 +356,7 @@ def scan_mail_log_line(line, collector):
if date > END_DATE: if date > END_DATE:
# Don't process, and halt # Don't process, and halt
return False return False
elif date < START_DATE: if date < START_DATE:
# Don't process, but continue # Don't process, but continue
return True return True
@ -391,7 +392,7 @@ def scan_postgrey_line(date, log, collector):
""" Scan a postgrey log line and extract interesting data """ """ Scan a postgrey log line and extract interesting data """
m = re.match(r"action=(greylist|pass), reason=(.*?), (?:delay=\d+, )?client_name=(.*), " m = re.match(r"action=(greylist|pass), reason=(.*?), (?:delay=\d+, )?client_name=(.*), "
"client_address=(.*), sender=(.*), recipient=(.*)", r"client_address=(.*), sender=(.*), recipient=(.*)",
log) log)
if m: if m:
@ -423,7 +424,7 @@ def scan_postfix_smtpd_line(date, log, collector):
# Check if the incoming mail was rejected # Check if the incoming mail was rejected
m = re.match("NOQUEUE: reject: RCPT from .*?: (.*?); from=<(.*?)> to=<(.*?)>", log) m = re.match(r"NOQUEUE: reject: RCPT from .*?: (.*?); from=<(.*?)> to=<(.*?)>", log)
if m: if m:
message, sender, user = m.groups() message, sender, user = m.groups()
@ -467,7 +468,7 @@ def scan_postfix_smtpd_line(date, log, collector):
def scan_dovecot_login_line(date, log, collector, protocol_name): def scan_dovecot_login_line(date, log, collector, protocol_name):
""" Scan a dovecot login log line and extract interesting data """ """ Scan a dovecot login log line and extract interesting data """
m = re.match("Info: Login: user=<(.*?)>, method=PLAIN, rip=(.*?),", log) m = re.match(r"Info: Login: user=<(.*?)>, method=PLAIN, rip=(.*?),", log)
if m: if m:
# TODO: CHECK DIT # TODO: CHECK DIT
@ -495,7 +496,7 @@ def add_login(user, date, protocol_name, host, collector):
data["latest"] = date data["latest"] = date
data["totals_by_protocol"][protocol_name] += 1 data["totals_by_protocol"][protocol_name] += 1
data["totals_by_protocol_and_host"][(protocol_name, host)] += 1 data["totals_by_protocol_and_host"][protocol_name, host] += 1
if host not in {"127.0.0.1", "::1"} or True: if host not in {"127.0.0.1", "::1"} or True:
data["activity-by-hour"][protocol_name][date.hour] += 1 data["activity-by-hour"][protocol_name][date.hour] += 1
@ -608,7 +609,8 @@ def valid_date(string):
try: try:
date = dateutil.parser.parse(string) date = dateutil.parser.parse(string)
except ValueError: except ValueError:
raise argparse.ArgumentTypeError("Unrecognized date and/or time '%s'" % string) msg = f"Unrecognized date and/or time '{string}'"
raise argparse.ArgumentTypeError(msg)
return date return date
@ -618,12 +620,12 @@ def print_time_table(labels, data, do_print=True):
labels.insert(0, "hour") labels.insert(0, "hour")
data.insert(0, [str(h) for h in range(24)]) data.insert(0, [str(h) for h in range(24)])
temp = "{:<%d} " % max(len(l) for l in labels) temp = f"{{:<{max(len(l) for l in labels):d}}} "
lines = [temp.format(label) for label in labels] lines = [temp.format(label) for label in labels]
for h in range(24): for h in range(24):
max_len = max(len(str(d[h])) for d in data) max_len = max(len(str(d[h])) for d in data)
base = "{:>%d} " % max(2, max_len) base = f"{{:>{max(2, max_len):d}}} "
for i, d in enumerate(data): for i, d in enumerate(data):
lines[i] += base.format(d[h]) lines[i] += base.format(d[h])
@ -634,8 +636,7 @@ def print_time_table(labels, data, do_print=True):
if do_print: if do_print:
print("\n".join(lines)) print("\n".join(lines))
return None return None
else: return lines
return lines
def print_user_table(users, data=None, sub_data=None, activity=None, latest=None, earliest=None, def print_user_table(users, data=None, sub_data=None, activity=None, latest=None, earliest=None,
@ -670,7 +671,7 @@ def print_user_table(users, data=None, sub_data=None, activity=None, latest=None
col_str = f"{d[row]!s:<20}" col_str = f"{d[row]!s:<20}"
col_left[col] = True col_left[col] = True
else: else:
temp = "{:>%s}" % max(5, len(l) + 1, len(str(d[row])) + 1) temp = f"{{:>{max(5, len(l) + 1, len(str(d[row])) + 1)}}}"
col_str = temp.format(str(d[row])) col_str = temp.format(str(d[row]))
col_widths[col] = max(col_widths[col], len(col_str)) col_widths[col] = max(col_widths[col], len(col_str))
line += col_str line += col_str
@ -679,7 +680,7 @@ def print_user_table(users, data=None, sub_data=None, activity=None, latest=None
data_accum[col] += d[row] data_accum[col] += d[row]
try: try:
if None not in [latest, earliest]: # noqa PLR6201 if None not in [latest, earliest]: # noqa: PLR6201
vert_pos = len(line) vert_pos = len(line)
e = earliest[row] e = earliest[row]
l = latest[row] l = latest[row]
@ -707,10 +708,10 @@ def print_user_table(users, data=None, sub_data=None, activity=None, latest=None
if sub_data is not None: if sub_data is not None:
for l, d in sub_data: for l, d in sub_data:
if d[row]: if d[row]:
lines.extend(('', '%s' % l, '├─%s' % (len(l) * ''), '')) lines.extend(('', f'{l}', '├─{}'.format(len(l) * ''), ''))
max_len = 0 max_len = 0
for v in list(d[row]): for v in list(d[row]):
lines.append("%s" % v) lines.append(f"{v}")
max_len = max(max_len, len(v)) max_len = max(max_len, len(v))
lines.append("" + (max_len + 1) * "") lines.append("" + (max_len + 1) * "")
@ -732,7 +733,7 @@ def print_user_table(users, data=None, sub_data=None, activity=None, latest=None
else: else:
header += l.rjust(max(5, len(l) + 1, col_widths[col])) header += l.rjust(max(5, len(l) + 1, col_widths[col]))
if None not in [latest, earliest]: # noqa PLR6201 if None not in [latest, earliest]: # noqa: PLR6201
header += " │ timespan " header += " │ timespan "
lines.insert(0, header.rstrip()) lines.insert(0, header.rstrip())
@ -753,11 +754,11 @@ def print_user_table(users, data=None, sub_data=None, activity=None, latest=None
data_accum = [numstr(a) for a in data_accum] data_accum = [numstr(a) for a in data_accum]
footer = str_temp.format("Totals:" if do_accum else " ") footer = str_temp.format("Totals:" if do_accum else " ")
for row, (l, _) in enumerate(data): for row, (l, _) in enumerate(data):
temp = "{:>%d}" % max(5, len(l) + 1) temp = f"{{:>{max(5, len(l) + 1):d}}}"
footer += temp.format(data_accum[row]) footer += temp.format(data_accum[row])
try: try:
if None not in [latest, earliest]: # noqa PLR6201 if None not in [latest, earliest]: # noqa: PLR6201
max_l = max(latest) max_l = max(latest)
min_e = min(earliest) min_e = min(earliest)
timespan = relativedelta(max_l, min_e) timespan = relativedelta(max_l, min_e)

View File

@ -15,6 +15,7 @@ import subprocess
import utils import utils
from email_validator import validate_email as validate_email_, EmailNotValidError from email_validator import validate_email as validate_email_, EmailNotValidError
import idna import idna
import operator
def validate_email(email, mode=None): def validate_email(email, mode=None):
# Checks that an email address is syntactically valid. Returns True/False. # Checks that an email address is syntactically valid. Returns True/False.
@ -94,8 +95,7 @@ def open_database(env, with_connection=False):
conn = sqlite3.connect(env["STORAGE_ROOT"] + "/mail/users.sqlite") conn = sqlite3.connect(env["STORAGE_ROOT"] + "/mail/users.sqlite")
if not with_connection: if not with_connection:
return conn.cursor() return conn.cursor()
else: return conn, conn.cursor()
return conn, conn.cursor()
def get_mail_users(env): def get_mail_users(env):
# Returns a flat, sorted list of all user accounts. # Returns a flat, sorted list of all user accounts.
@ -285,7 +285,7 @@ def get_mail_aliases_ex(env):
# Sort aliases within each domain first by required-ness then lexicographically by address. # Sort aliases within each domain first by required-ness then lexicographically by address.
for domain in domains: for domain in domains:
domain["aliases"].sort(key = lambda alias : (alias["auto"], alias["address"])) domain["aliases"].sort(key = operator.itemgetter("auto", "address"))
return domains return domains
def get_domain(emailaddr, as_unicode=True): def get_domain(emailaddr, as_unicode=True):
@ -301,7 +301,7 @@ def get_domain(emailaddr, as_unicode=True):
pass pass
return ret return ret
def get_mail_domains(env, filter_aliases=lambda alias : True, users_only=False): def get_mail_domains(env, filter_aliases=lambda _alias : True, users_only=False):
# Returns the domain names (IDNA-encoded) of all of the email addresses # Returns the domain names (IDNA-encoded) of all of the email addresses
# configured on the system. If users_only is True, only return domains # configured on the system. If users_only is True, only return domains
# with email addresses that correspond to user accounts. Exclude Unicode # with email addresses that correspond to user accounts. Exclude Unicode
@ -314,13 +314,13 @@ def get_mail_domains(env, filter_aliases=lambda alias : True, users_only=False):
def add_mail_user(email, pw, privs, quota, env): def add_mail_user(email, pw, privs, quota, env):
# validate email # validate email
if email.strip() == "": if not email.strip():
return ("No email address provided.", 400) return ("No email address provided.", 400)
elif not validate_email(email): if not validate_email(email):
return ("Invalid email address.", 400) return ("Invalid email address.", 400)
elif not validate_email(email, mode='user'): if not validate_email(email, mode='user'):
return ("User account email addresses may only use the lowercase ASCII letters a-z, the digits 0-9, underscore (_), hyphen (-), and period (.).", 400) return ("User account email addresses may only use the lowercase ASCII letters a-z, the digits 0-9, underscore (_), hyphen (-), and period (.).", 400)
elif is_dcv_address(email) and len(get_mail_users(env)) > 0: if is_dcv_address(email) and len(get_mail_users(env)) > 0:
# Make domain control validation hijacking a little harder to mess up by preventing the usual # Make domain control validation hijacking a little harder to mess up by preventing the usual
# addresses used for DCV from being user accounts. Except let it be the first account because # addresses used for DCV from being user accounts. Except let it be the first account because
# during box setup the user won't know the rules. # during box setup the user won't know the rules.
@ -330,7 +330,7 @@ def add_mail_user(email, pw, privs, quota, env):
validate_password(pw) validate_password(pw)
# validate privileges # validate privileges
if privs is None or privs.strip() == "": if privs is None or not privs.strip():
privs = [] privs = []
else: else:
privs = privs.split("\n") privs = privs.split("\n")
@ -378,7 +378,7 @@ def set_mail_password(email, pw, env):
conn, c = open_database(env, with_connection=True) conn, c = open_database(env, with_connection=True)
c.execute("UPDATE users SET password=? WHERE email=?", (pw, email)) c.execute("UPDATE users SET password=? WHERE email=?", (pw, email))
if c.rowcount != 1: if c.rowcount != 1:
return ("That's not a user (%s)." % email, 400) return (f"That's not a user ({email}).", 400)
conn.commit() conn.commit()
return "OK" return "OK"
@ -446,7 +446,8 @@ def get_mail_password(email, env):
c.execute('SELECT password FROM users WHERE email=?', (email,)) c.execute('SELECT password FROM users WHERE email=?', (email,))
rows = c.fetchall() rows = c.fetchall()
if len(rows) != 1: if len(rows) != 1:
raise ValueError("That's not a user (%s)." % email) msg = f"That's not a user ({email})."
raise ValueError(msg)
return rows[0][0] return rows[0][0]
def remove_mail_user(email, env): def remove_mail_user(email, env):
@ -454,14 +455,14 @@ def remove_mail_user(email, env):
conn, c = open_database(env, with_connection=True) conn, c = open_database(env, with_connection=True)
c.execute("DELETE FROM users WHERE email=?", (email,)) c.execute("DELETE FROM users WHERE email=?", (email,))
if c.rowcount != 1: if c.rowcount != 1:
return ("That's not a user (%s)." % email, 400) return (f"That's not a user ({email}).", 400)
conn.commit() conn.commit()
# Update things in case any domains are removed. # Update things in case any domains are removed.
return kick(env, "mail user removed") return kick(env, "mail user removed")
def parse_privs(value): def parse_privs(value):
return [p for p in value.split("\n") if p.strip() != ""] return [p for p in value.split("\n") if p.strip()]
def get_mail_user_privileges(email, env, empty_on_error=False): def get_mail_user_privileges(email, env, empty_on_error=False):
# get privs # get privs
@ -470,12 +471,12 @@ def get_mail_user_privileges(email, env, empty_on_error=False):
rows = c.fetchall() rows = c.fetchall()
if len(rows) != 1: if len(rows) != 1:
if empty_on_error: return [] if empty_on_error: return []
return ("That's not a user (%s)." % email, 400) return (f"That's not a user ({email}).", 400)
return parse_privs(rows[0][0]) return parse_privs(rows[0][0])
def validate_privilege(priv): def validate_privilege(priv):
if "\n" in priv or priv.strip() == "": if "\n" in priv or not priv.strip():
return ("That's not a valid privilege (%s)." % priv, 400) return (f"That's not a valid privilege ({priv}).", 400)
return None return None
def add_remove_mail_user_privilege(email, priv, action, env): def add_remove_mail_user_privilege(email, priv, action, env):
@ -515,10 +516,10 @@ def add_mail_alias(address, forwards_to, permitted_senders, env, update_if_exist
# validate address # validate address
address = address.strip() address = address.strip()
if address == "": if not address:
return ("No email address provided.", 400) return ("No email address provided.", 400)
if not validate_email(address, mode='alias'): if not validate_email(address, mode='alias'):
return ("Invalid email address (%s)." % address, 400) return (f"Invalid email address ({address}).", 400)
# validate forwards_to # validate forwards_to
validated_forwards_to = [] validated_forwards_to = []
@ -542,12 +543,12 @@ def add_mail_alias(address, forwards_to, permitted_senders, env, update_if_exist
for line in forwards_to.split("\n"): for line in forwards_to.split("\n"):
for email in line.split(","): for email in line.split(","):
email = email.strip() email = email.strip()
if email == "": continue if not email: continue
email = sanitize_idn_email_address(email) # Unicode => IDNA email = sanitize_idn_email_address(email) # Unicode => IDNA
# Strip any +tag from email alias and check privileges # Strip any +tag from email alias and check privileges
privileged_email = re.sub(r"(?=\+)[^@]*(?=@)",'',email) privileged_email = re.sub(r"(?=\+)[^@]*(?=@)",'',email)
if not validate_email(email): if not validate_email(email):
return ("Invalid receiver email address (%s)." % email, 400) return (f"Invalid receiver email address ({email}).", 400)
if is_dcv_source and not is_dcv_address(email) and "admin" not in get_mail_user_privileges(privileged_email, env, empty_on_error=True): if is_dcv_source and not is_dcv_address(email) and "admin" not in get_mail_user_privileges(privileged_email, env, empty_on_error=True):
# Make domain control validation hijacking a little harder to mess up by # Make domain control validation hijacking a little harder to mess up by
# requiring aliases for email addresses typically used in DCV to forward # requiring aliases for email addresses typically used in DCV to forward
@ -565,9 +566,9 @@ def add_mail_alias(address, forwards_to, permitted_senders, env, update_if_exist
for line in permitted_senders.split("\n"): for line in permitted_senders.split("\n"):
for login in line.split(","): for login in line.split(","):
login = login.strip() login = login.strip()
if login == "": continue if not login: continue
if login not in valid_logins: if login not in valid_logins:
return ("Invalid permitted sender: %s is not a user on this system." % login, 400) return (f"Invalid permitted sender: {login} is not a user on this system.", 400)
validated_permitted_senders.append(login) validated_permitted_senders.append(login)
# Make sure the alias has either a forwards_to or a permitted_sender. # Make sure the alias has either a forwards_to or a permitted_sender.
@ -586,10 +587,9 @@ def add_mail_alias(address, forwards_to, permitted_senders, env, update_if_exist
return_status = "alias added" return_status = "alias added"
except sqlite3.IntegrityError: except sqlite3.IntegrityError:
if not update_if_exists: if not update_if_exists:
return ("Alias already exists (%s)." % address, 400) return (f"Alias already exists ({address}).", 400)
else: c.execute("UPDATE aliases SET destination = ?, permitted_senders = ? WHERE source = ?", (forwards_to, permitted_senders, address))
c.execute("UPDATE aliases SET destination = ?, permitted_senders = ? WHERE source = ?", (forwards_to, permitted_senders, address)) return_status = "alias updated"
return_status = "alias updated"
conn.commit() conn.commit()
@ -606,7 +606,7 @@ def remove_mail_alias(address, env, do_kick=True):
conn, c = open_database(env, with_connection=True) conn, c = open_database(env, with_connection=True)
c.execute("DELETE FROM aliases WHERE source=?", (address,)) c.execute("DELETE FROM aliases WHERE source=?", (address,))
if c.rowcount != 1: if c.rowcount != 1:
return ("That's not an alias (%s)." % address, 400) return (f"That's not an alias ({address}).", 400)
conn.commit() conn.commit()
if do_kick: if do_kick:
@ -703,11 +703,11 @@ def kick(env, mail_result=None):
from web_update import do_web_update from web_update import do_web_update
results.append( do_web_update(env) ) results.append( do_web_update(env) )
return "".join(s for s in results if s != "") return "".join(s for s in results if s)
def validate_password(pw): def validate_password(pw):
# validate password # validate password
if pw.strip() == "": if not pw.strip():
msg = "No password provided." msg = "No password provided."
raise ValueError(msg) raise ValueError(msg)
if len(pw) < 8: if len(pw) < 8:

View File

@ -10,7 +10,9 @@ from mailconfig import open_database
def get_user_id(email, c): def get_user_id(email, c):
c.execute('SELECT id FROM users WHERE email=?', (email,)) c.execute('SELECT id FROM users WHERE email=?', (email,))
r = c.fetchone() r = c.fetchone()
if not r: raise ValueError("User does not exist.") if not r:
msg = "User does not exist."
raise ValueError(msg)
return r[0] return r[0]
def get_mfa_state(email, env): def get_mfa_state(email, env):
@ -68,7 +70,7 @@ def disable_mfa(email, mfa_id, env):
return c.rowcount > 0 return c.rowcount > 0
def validate_totp_secret(secret): def validate_totp_secret(secret):
if not isinstance(secret, str) or secret.strip() == "": if not isinstance(secret, str) or not secret.strip():
msg = "No secret provided." msg = "No secret provided."
raise ValueError(msg) raise ValueError(msg)
if len(secret) != 32: if len(secret) != 32:

View File

@ -63,9 +63,7 @@ def get_ssl_certificates(env):
if isinstance(pem, Certificate): if isinstance(pem, Certificate):
certificates.append({ "filename": fn, "cert": pem }) certificates.append({ "filename": fn, "cert": pem })
# It is a private key # It is a private key
elif (isinstance(pem, rsa.RSAPrivateKey) elif (isinstance(pem, (rsa.RSAPrivateKey, dsa.DSAPrivateKey, ec.EllipticCurvePrivateKey))):
or isinstance(pem, dsa.DSAPrivateKey)
or isinstance(pem, ec.EllipticCurvePrivateKey)):
private_keys[pem.public_key().public_numbers()] = { "filename": fn, "key": pem } private_keys[pem.public_key().public_numbers()] = { "filename": fn, "key": pem }
@ -160,14 +158,13 @@ def get_domain_ssl_files(domain, ssl_certificates, env, allow_missing_cert=False
wildcard_domain = re.sub(r"^[^\.]+", "*", domain) wildcard_domain = re.sub(r"^[^\.]+", "*", domain)
if domain in ssl_certificates: if domain in ssl_certificates:
return ssl_certificates[domain] return ssl_certificates[domain]
elif wildcard_domain in ssl_certificates: if wildcard_domain in ssl_certificates:
return ssl_certificates[wildcard_domain] return ssl_certificates[wildcard_domain]
elif not allow_missing_cert: if not allow_missing_cert:
# No valid certificate is available for this domain! Return default files. # No valid certificate is available for this domain! Return default files.
return system_certificate return system_certificate
else: # No valid certificate is available for this domain.
# No valid certificate is available for this domain. return None
return None
# PROVISIONING CERTIFICATES FROM LETSENCRYPT # PROVISIONING CERTIFICATES FROM LETSENCRYPT
@ -516,9 +513,11 @@ def check_certificate(domain, ssl_certificate, ssl_private_key, warn_if_expiring
try: try:
ssl_cert_chain = load_cert_chain(ssl_certificate) ssl_cert_chain = load_cert_chain(ssl_certificate)
cert = load_pem(ssl_cert_chain[0]) cert = load_pem(ssl_cert_chain[0])
if not isinstance(cert, Certificate): raise ValueError("This is not a certificate file.") if not isinstance(cert, Certificate):
msg = "This is not a certificate file."
raise ValueError(msg)
except ValueError as e: except ValueError as e:
return ("There is a problem with the certificate file: %s" % str(e), None) return (f"There is a problem with the certificate file: {e!s}", None)
# First check that the domain name is one of the names allowed by # First check that the domain name is one of the names allowed by
# the certificate. # the certificate.
@ -530,8 +529,7 @@ def check_certificate(domain, ssl_certificate, ssl_private_key, warn_if_expiring
# should work in normal cases). # should work in normal cases).
wildcard_domain = re.sub(r"^[^\.]+", "*", domain) wildcard_domain = re.sub(r"^[^\.]+", "*", domain)
if domain not in certificate_names and wildcard_domain not in certificate_names: if domain not in certificate_names and wildcard_domain not in certificate_names:
return ("The certificate is for the wrong domain name. It is for %s." return ("The certificate is for the wrong domain name. It is for {}.".format(", ".join(sorted(certificate_names))), None)
% ", ".join(sorted(certificate_names)), None)
# Second, check that the certificate matches the private key. # Second, check that the certificate matches the private key.
if ssl_private_key is not None: if ssl_private_key is not None:
@ -544,10 +542,10 @@ def check_certificate(domain, ssl_certificate, ssl_private_key, warn_if_expiring
if (not isinstance(priv_key, rsa.RSAPrivateKey) if (not isinstance(priv_key, rsa.RSAPrivateKey)
and not isinstance(priv_key, dsa.DSAPrivateKey) and not isinstance(priv_key, dsa.DSAPrivateKey)
and not isinstance(priv_key, ec.EllipticCurvePrivateKey)): and not isinstance(priv_key, ec.EllipticCurvePrivateKey)):
return ("The private key file %s is not a private key file." % ssl_private_key, None) return (f"The private key file {ssl_private_key} is not a private key file.", None)
if priv_key.public_key().public_numbers() != cert.public_key().public_numbers(): if priv_key.public_key().public_numbers() != cert.public_key().public_numbers():
return ("The certificate does not correspond to the private key at %s." % ssl_private_key, None) return (f"The certificate does not correspond to the private key at {ssl_private_key}.", None)
# We could also use the openssl command line tool to get the modulus # We could also use the openssl command line tool to get the modulus
# listed in each file. The output of each command below looks like "Modulus=XXXXX". # listed in each file. The output of each command below looks like "Modulus=XXXXX".
@ -591,34 +589,33 @@ def check_certificate(domain, ssl_certificate, ssl_private_key, warn_if_expiring
# Certificate is self-signed. Probably we detected this above. # Certificate is self-signed. Probably we detected this above.
return ("SELF-SIGNED", None) return ("SELF-SIGNED", None)
elif retcode != 0: if retcode != 0:
if "unable to get local issuer certificate" in verifyoutput: if "unable to get local issuer certificate" in verifyoutput:
return ("The certificate is missing an intermediate chain or the intermediate chain is incorrect or incomplete. (%s)" % verifyoutput, None) return (f"The certificate is missing an intermediate chain or the intermediate chain is incorrect or incomplete. ({verifyoutput})", None)
# There is some unknown problem. Return the `openssl verify` raw output. # There is some unknown problem. Return the `openssl verify` raw output.
return ("There is a problem with the certificate.", verifyoutput.strip()) return ("There is a problem with the certificate.", verifyoutput.strip())
# `openssl verify` returned a zero exit status so the cert is currently
# good.
# But is it expiring soon?
cert_expiration_date = cert.not_valid_after
ndays = (cert_expiration_date-now).days
if not rounded_time or ndays <= 10:
# Yikes better renew soon!
expiry_info = f"The certificate expires in {ndays:d} days on {cert_expiration_date.date().isoformat()}."
else: else:
# `openssl verify` returned a zero exit status so the cert is currently # We'll renew it with Lets Encrypt.
# good. expiry_info = f"The certificate expires on {cert_expiration_date.date().isoformat()}."
# But is it expiring soon? if warn_if_expiring_soon and ndays <= warn_if_expiring_soon:
cert_expiration_date = cert.not_valid_after # Warn on day 10 to give 4 days for us to automatically renew the
ndays = (cert_expiration_date-now).days # certificate, which occurs on day 14.
if not rounded_time or ndays <= 10: return ("The certificate is expiring soon: " + expiry_info, None)
# Yikes better renew soon!
expiry_info = "The certificate expires in %d days on %s." % (ndays, cert_expiration_date.date().isoformat())
else:
# We'll renew it with Lets Encrypt.
expiry_info = "The certificate expires on %s." % cert_expiration_date.date().isoformat()
if warn_if_expiring_soon and ndays <= warn_if_expiring_soon: # Return the special OK code.
# Warn on day 10 to give 4 days for us to automatically renew the return ("OK", expiry_info)
# certificate, which occurs on day 14.
return ("The certificate is expiring soon: " + expiry_info, None)
# Return the special OK code.
return ("OK", expiry_info)
def load_cert_chain(pemfile): def load_cert_chain(pemfile):
# A certificate .pem file may contain a chain of certificates. # A certificate .pem file may contain a chain of certificates.
@ -638,7 +635,7 @@ def load_pem(pem):
from cryptography.x509 import load_pem_x509_certificate from cryptography.x509 import load_pem_x509_certificate
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
pem_type = re.match(b"-+BEGIN (.*?)-+[\r\n]", pem) pem_type = re.match(br"-+BEGIN (.*?)-+[\r\n]", pem)
if pem_type is None: if pem_type is None:
msg = "File is not a valid PEM-formatted file." msg = "File is not a valid PEM-formatted file."
raise ValueError(msg) raise ValueError(msg)
@ -672,13 +669,11 @@ def get_certificate_domains(cert):
def idna_decode_dns_name(dns_name): def idna_decode_dns_name(dns_name):
if dns_name.startswith("*."): if dns_name.startswith("*."):
return "*." + idna.encode(dns_name[2:]).decode('ascii') return "*." + idna.encode(dns_name[2:]).decode('ascii')
else: return idna.encode(dns_name).decode('ascii')
return idna.encode(dns_name).decode('ascii')
try: try:
sans = cert.extensions.get_extension_for_oid(OID_SUBJECT_ALTERNATIVE_NAME).value.get_values_for_type(DNSName) sans = cert.extensions.get_extension_for_oid(OID_SUBJECT_ALTERNATIVE_NAME).value.get_values_for_type(DNSName)
for san in sans: names.update(idna_decode_dns_name(san) for san in sans)
names.add(idna_decode_dns_name(san))
except ExtensionNotFound: except ExtensionNotFound:
pass pass

View File

@ -105,12 +105,12 @@ def check_service(i, service, env):
s.settimeout(1) s.settimeout(1)
try: try:
s.connect((ip, service["port"])) s.connect((ip, service["port"]))
return True
except OSError: except OSError:
# timed out or some other odd error # timed out or some other odd error
return False return False
finally: finally:
s.close() s.close()
return True
if service["public"]: if service["public"]:
# Service should be publicly accessible. # Service should be publicly accessible.
@ -122,15 +122,15 @@ def check_service(i, service, env):
# IPv4 ok but IPv6 failed. Try the PRIVATE_IPV6 address to see if the service is bound to the interface. # IPv4 ok but IPv6 failed. Try the PRIVATE_IPV6 address to see if the service is bound to the interface.
elif service["port"] != 53 and try_connect(env["PRIVATE_IPV6"]): elif service["port"] != 53 and try_connect(env["PRIVATE_IPV6"]):
output.print_error("%s is running (and available over IPv4 and the local IPv6 address), but it is not publicly accessible at %s:%d." % (service['name'], env['PUBLIC_IPV6'], service['port'])) output.print_error("{} is running (and available over IPv4 and the local IPv6 address), but it is not publicly accessible at {}:{:d}.".format(service['name'], env['PUBLIC_IPV6'], service['port']))
else: else:
output.print_error("%s is running and available over IPv4 but is not accessible over IPv6 at %s port %d." % (service['name'], env['PUBLIC_IPV6'], service['port'])) output.print_error("{} is running and available over IPv4 but is not accessible over IPv6 at {} port {:d}.".format(service['name'], env['PUBLIC_IPV6'], service['port']))
# IPv4 failed. Try the private IP to see if the service is running but not accessible (except DNS because a different service runs on the private IP). # IPv4 failed. Try the private IP to see if the service is running but not accessible (except DNS because a different service runs on the private IP).
elif service["port"] != 53 and try_connect("127.0.0.1"): elif service["port"] != 53 and try_connect("127.0.0.1"):
output.print_error("%s is running but is not publicly accessible at %s:%d." % (service['name'], env['PUBLIC_IP'], service['port'])) output.print_error("{} is running but is not publicly accessible at {}:{:d}.".format(service['name'], env['PUBLIC_IP'], service['port']))
else: else:
output.print_error("%s is not running (port %d)." % (service['name'], service['port'])) output.print_error("{} is not running (port {:d}).".format(service['name'], service['port']))
# Why is nginx not running? # Why is nginx not running?
if not running and service["port"] in {80, 443}: if not running and service["port"] in {80, 443}:
@ -140,7 +140,7 @@ def check_service(i, service, env):
elif try_connect("127.0.0.1"): elif try_connect("127.0.0.1"):
running = True running = True
else: else:
output.print_error("%s is not running (port %d)." % (service['name'], service['port'])) output.print_error("{} is not running (port {:d}).".format(service['name'], service['port']))
# Flag if local DNS is not running. # Flag if local DNS is not running.
if not running and service["port"] == 53 and service["public"] is False: if not running and service["port"] == 53 and service["public"] is False:
@ -209,7 +209,7 @@ def check_software_updates(env, output):
elif len(pkgs) == 0: elif len(pkgs) == 0:
output.print_ok("System software is up to date.") output.print_ok("System software is up to date.")
else: else:
output.print_error("There are %d software packages that can be updated." % len(pkgs)) output.print_error(f"There are {len(pkgs):d} software packages that can be updated.")
for p in pkgs: for p in pkgs:
output.print_line("{} ({})".format(p["package"], p["version"])) output.print_line("{} ({})".format(p["package"], p["version"]))
@ -223,7 +223,7 @@ def check_free_disk_space(rounded_values, env, output):
st = os.statvfs(env['STORAGE_ROOT']) st = os.statvfs(env['STORAGE_ROOT'])
bytes_total = st.f_blocks * st.f_frsize bytes_total = st.f_blocks * st.f_frsize
bytes_free = st.f_bavail * st.f_frsize bytes_free = st.f_bavail * st.f_frsize
disk_msg = "The disk has %.2f GB space remaining." % (bytes_free/1024.0/1024.0/1024.0) disk_msg = f"The disk has {bytes_free/1024.0/1024.0/1024.0:.2f} GB space remaining."
if bytes_free > .3 * bytes_total: if bytes_free > .3 * bytes_total:
if rounded_values: disk_msg = "The disk has more than 30% free space." if rounded_values: disk_msg = "The disk has more than 30% free space."
output.print_ok(disk_msg) output.print_ok(disk_msg)
@ -248,7 +248,7 @@ def check_free_disk_space(rounded_values, env, output):
def check_free_memory(rounded_values, env, output): def check_free_memory(rounded_values, env, output):
# Check free memory. # Check free memory.
percent_free = 100 - psutil.virtual_memory().percent percent_free = 100 - psutil.virtual_memory().percent
memory_msg = "System memory is %s%% free." % str(round(percent_free)) memory_msg = f"System memory is {round(percent_free)!s}% free."
if percent_free >= 20: if percent_free >= 20:
if rounded_values: memory_msg = "System free memory is at least 20%." if rounded_values: memory_msg = "System free memory is at least 20%."
output.print_ok(memory_msg) output.print_ok(memory_msg)
@ -478,7 +478,7 @@ def check_primary_hostname_dns(domain, env, output, dns_domains, dns_zonefiles):
tlsa25 = query_dns(tlsa_qname, "TLSA", nxdomain=None) tlsa25 = query_dns(tlsa_qname, "TLSA", nxdomain=None)
tlsa25_expected = build_tlsa_record(env) tlsa25_expected = build_tlsa_record(env)
if tlsa25 == tlsa25_expected: if tlsa25 == tlsa25_expected:
output.print_ok("""The DANE TLSA record for incoming mail is correct (%s).""" % tlsa_qname,) output.print_ok(f"""The DANE TLSA record for incoming mail is correct ({tlsa_qname}).""",)
elif tlsa25 is None: elif tlsa25 is None:
if has_dnssec: if has_dnssec:
# Omit a warning about it not being set if DNSSEC isn't enabled, # Omit a warning about it not being set if DNSSEC isn't enabled,
@ -497,9 +497,9 @@ def check_alias_exists(alias_name, alias, env, output):
if mail_aliases[alias]: if mail_aliases[alias]:
output.print_ok(f"{alias_name} exists as a mail alias. [{alias}{mail_aliases[alias]}]") output.print_ok(f"{alias_name} exists as a mail alias. [{alias}{mail_aliases[alias]}]")
else: else:
output.print_error("""You must set the destination of the mail alias for %s to direct email to you or another administrator.""" % alias) output.print_error(f"""You must set the destination of the mail alias for {alias} to direct email to you or another administrator.""")
else: else:
output.print_error("""You must add a mail alias for %s which directs email to you or another administrator.""" % alias) output.print_error(f"""You must add a mail alias for {alias} which directs email to you or another administrator.""")
def check_dns_zone(domain, env, output, dns_zonefiles): def check_dns_zone(domain, env, output, dns_zonefiles):
# If a DS record is set at the registrar, check DNSSEC first because it will affect the NS query. # If a DS record is set at the registrar, check DNSSEC first because it will affect the NS query.
@ -527,7 +527,7 @@ def check_dns_zone(domain, env, output, dns_zonefiles):
probably_external_dns = False probably_external_dns = False
if existing_ns.lower() == correct_ns.lower(): if existing_ns.lower() == correct_ns.lower():
output.print_ok("Nameservers are set correctly at registrar. [%s]" % correct_ns) output.print_ok(f"Nameservers are set correctly at registrar. [{correct_ns}]")
elif ip == correct_ip: elif ip == correct_ip:
# The domain resolves correctly, so maybe the user is using External DNS. # The domain resolves correctly, so maybe the user is using External DNS.
output.print_warning(f"""The nameservers set on this domain at your domain name registrar should be {correct_ns}. They are currently {existing_ns}. output.print_warning(f"""The nameservers set on this domain at your domain name registrar should be {correct_ns}. They are currently {existing_ns}.
@ -546,7 +546,7 @@ def check_dns_zone(domain, env, output, dns_zonefiles):
# We must first resolve the nameserver to an IP address so we can query it. # We must first resolve the nameserver to an IP address so we can query it.
ns_ips = query_dns(ns, "A") ns_ips = query_dns(ns, "A")
if not ns_ips or ns_ips in {'[Not Set]', '[timeout]'}: if not ns_ips or ns_ips in {'[Not Set]', '[timeout]'}:
output.print_error("Secondary nameserver %s is not valid (it doesn't resolve to an IP address)." % ns) output.print_error(f"Secondary nameserver {ns} is not valid (it doesn't resolve to an IP address).")
continue continue
# Choose the first IP if nameserver returns multiple # Choose the first IP if nameserver returns multiple
ns_ip = ns_ips.split('; ')[0] ns_ip = ns_ips.split('; ')[0]
@ -587,7 +587,7 @@ def check_dns_zone_suggestions(domain, env, output, dns_zonefiles, domains_with_
if domain in domains_with_a_records: if domain in domains_with_a_records:
output.print_warning("""Web has been disabled for this domain because you have set a custom DNS record.""") output.print_warning("""Web has been disabled for this domain because you have set a custom DNS record.""")
if "www." + domain in domains_with_a_records: if "www." + domain in domains_with_a_records:
output.print_warning("""A redirect from 'www.%s' has been disabled for this domain because you have set a custom DNS record on the www subdomain.""" % domain) output.print_warning(f"""A redirect from 'www.{domain}' has been disabled for this domain because you have set a custom DNS record on the www subdomain.""")
# Since DNSSEC is optional, if a DS record is NOT set at the registrar suggest it. # Since DNSSEC is optional, if a DS record is NOT set at the registrar suggest it.
# (If it was set, we did the check earlier.) # (If it was set, we did the check earlier.)
@ -616,11 +616,11 @@ def check_dnssec(domain, env, output, dns_zonefiles, is_checking_primary=False):
# Some registrars may want the public key so they can compute the digest. The DS # Some registrars may want the public key so they can compute the digest. The DS
# record that we suggest using is for the KSK (and that's how the DS records were generated). # record that we suggest using is for the KSK (and that's how the DS records were generated).
# We'll also give the nice name for the key algorithm. # We'll also give the nice name for the key algorithm.
dnssec_keys = load_env_vars_from_file(os.path.join(env['STORAGE_ROOT'], 'dns/dnssec/%s.conf' % alg_name_map[ds_alg])) dnssec_keys = load_env_vars_from_file(os.path.join(env['STORAGE_ROOT'], f'dns/dnssec/{alg_name_map[ds_alg]}.conf'))
with open(os.path.join(env['STORAGE_ROOT'], 'dns/dnssec/' + dnssec_keys['KSK'] + '.key'), encoding="utf-8") as f: with open(os.path.join(env['STORAGE_ROOT'], 'dns/dnssec/' + dnssec_keys['KSK'] + '.key'), encoding="utf-8") as f:
dnsssec_pubkey = f.read().split("\t")[3].split(" ")[3] dnsssec_pubkey = f.read().split("\t")[3].split(" ")[3]
expected_ds_records[ (ds_keytag, ds_alg, ds_digalg, ds_digest) ] = { expected_ds_records[ ds_keytag, ds_alg, ds_digalg, ds_digest ] = {
"record": rr_ds, "record": rr_ds,
"keytag": ds_keytag, "keytag": ds_keytag,
"alg": ds_alg, "alg": ds_alg,
@ -653,16 +653,16 @@ def check_dnssec(domain, env, output, dns_zonefiles, is_checking_primary=False):
if {r[1] for r in matched_ds} == { '13' } and {r[2] for r in matched_ds} <= { '2', '4' }: # all are alg 13 and digest type 2 or 4 if {r[1] for r in matched_ds} == { '13' } and {r[2] for r in matched_ds} <= { '2', '4' }: # all are alg 13 and digest type 2 or 4
output.print_ok("DNSSEC 'DS' record is set correctly at registrar.") output.print_ok("DNSSEC 'DS' record is set correctly at registrar.")
return return
elif len([r for r in matched_ds if r[1] == '13' and r[2] in { '2', '4' }]) > 0: # some but not all are alg 13 if len([r for r in matched_ds if r[1] == '13' and r[2] in { '2', '4' }]) > 0: # some but not all are alg 13
output.print_ok("DNSSEC 'DS' record is set correctly at registrar. (Records using algorithm other than ECDSAP256SHA256 and digest types other than SHA-256/384 should be removed.)") output.print_ok("DNSSEC 'DS' record is set correctly at registrar. (Records using algorithm other than ECDSAP256SHA256 and digest types other than SHA-256/384 should be removed.)")
return return
else: # no record uses alg 13 # no record uses alg 13
output.print_warning("""DNSSEC 'DS' record set at registrar is valid but should be updated to ECDSAP256SHA256 and SHA-256 (see below). output.print_warning("""DNSSEC 'DS' record set at registrar is valid but should be updated to ECDSAP256SHA256 and SHA-256 (see below).
IMPORTANT: Do not delete existing DNSSEC 'DS' records for this domain until confirmation that the new DNSSEC 'DS' record IMPORTANT: Do not delete existing DNSSEC 'DS' records for this domain until confirmation that the new DNSSEC 'DS' record
for this domain is valid.""") for this domain is valid.""")
else: else:
if is_checking_primary: if is_checking_primary:
output.print_error("""The DNSSEC 'DS' record for %s is incorrect. See further details below.""" % domain) output.print_error(f"""The DNSSEC 'DS' record for {domain} is incorrect. See further details below.""")
return return
output.print_error("""This domain's DNSSEC DS record is incorrect. The chain of trust is broken between the public DNS system output.print_error("""This domain's DNSSEC DS record is incorrect. The chain of trust is broken between the public DNS system
and this machine's DNS server. It may take several hours for public DNS to update after a change. If you did not recently and this machine's DNS server. It may take several hours for public DNS to update after a change. If you did not recently
@ -763,7 +763,7 @@ def check_mail_domain(domain, env, output):
# Stop if the domain is listed in the Spamhaus Domain Block List. # Stop if the domain is listed in the Spamhaus Domain Block List.
# The user might have chosen a domain that was previously in use by a spammer # The user might have chosen a domain that was previously in use by a spammer
# and will not be able to reliably send mail. # and will not be able to reliably send mail.
# See https://www.spamhaus.org/news/article/807/using-our-public-mirrors-check-your-return-codes-now. for # See https://www.spamhaus.org/news/article/807/using-our-public-mirrors-check-your-return-codes-now. for
# information on spamhaus return codes # information on spamhaus return codes
dbl = query_dns(domain+'.dbl.spamhaus.org', "A", nxdomain=None) dbl = query_dns(domain+'.dbl.spamhaus.org', "A", nxdomain=None)
@ -774,11 +774,11 @@ def check_mail_domain(domain, env, output):
elif dbl == "[Not Set]": elif dbl == "[Not Set]":
output.print_warning(f"Could not connect to dbl.spamhaus.org. Could not determine whether the domain {domain} is blacklisted. Please try again later.") output.print_warning(f"Could not connect to dbl.spamhaus.org. Could not determine whether the domain {domain} is blacklisted. Please try again later.")
elif dbl == "127.255.255.252": elif dbl == "127.255.255.252":
output.print_warning("Incorrect spamhaus query: %s. Could not determine whether the domain %s is blacklisted." % (domain+'.dbl.spamhaus.org', domain)) output.print_warning("Incorrect spamhaus query: {}. Could not determine whether the domain {} is blacklisted.".format(domain+'.dbl.spamhaus.org', domain))
elif dbl == "127.255.255.254": elif dbl == "127.255.255.254":
output.print_warning("Mail-in-a-Box is configured to use a public DNS server. This is not supported by spamhaus. Could not determine whether the domain {} is blacklisted.".format(domain)) output.print_warning(f"Mail-in-a-Box is configured to use a public DNS server. This is not supported by spamhaus. Could not determine whether the domain {domain} is blacklisted.")
elif dbl == "127.255.255.255": elif dbl == "127.255.255.255":
output.print_warning("Too many queries have been performed on the spamhaus server. Could not determine whether the domain {} is blacklisted.".format(domain)) output.print_warning(f"Too many queries have been performed on the spamhaus server. Could not determine whether the domain {domain} is blacklisted.")
else: else:
output.print_error(f"""This domain is listed in the Spamhaus Domain Block List (code {dbl}), output.print_error(f"""This domain is listed in the Spamhaus Domain Block List (code {dbl}),
which may prevent recipients from receiving your mail. which may prevent recipients from receiving your mail.
@ -918,7 +918,7 @@ def list_apt_updates(apt_update=True):
simulated_install = shell("check_output", ["/usr/bin/apt-get", "-qq", "-s", "upgrade"]) simulated_install = shell("check_output", ["/usr/bin/apt-get", "-qq", "-s", "upgrade"])
pkgs = [] pkgs = []
for line in simulated_install.split('\n'): for line in simulated_install.split('\n'):
if line.strip() == "": if not line.strip():
continue continue
if re.match(r'^Conf .*', line): if re.match(r'^Conf .*', line):
# remove these lines, not informative # remove these lines, not informative
@ -947,7 +947,7 @@ def get_latest_miab_version():
from urllib.request import urlopen, HTTPError, URLError from urllib.request import urlopen, HTTPError, URLError
try: try:
return re.search(b'TAG=(.*)', urlopen("https://mailinabox.email/setup.sh?ping=1", timeout=5).read()).group(1).decode("utf8") return re.search(br'TAG=(.*)', urlopen("https://mailinabox.email/setup.sh?ping=1", timeout=5).read()).group(1).decode("utf8")
except (TimeoutError, HTTPError, URLError): except (TimeoutError, HTTPError, URLError):
return None return None
@ -960,14 +960,14 @@ def check_miab_version(env, output):
this_ver = "Unknown" this_ver = "Unknown"
if config.get("privacy", True): if config.get("privacy", True):
output.print_warning("You are running version Mail-in-a-Box %s. Mail-in-a-Box version check disabled by privacy setting." % this_ver) output.print_warning(f"You are running version Mail-in-a-Box {this_ver}. Mail-in-a-Box version check disabled by privacy setting.")
else: else:
latest_ver = get_latest_miab_version() latest_ver = get_latest_miab_version()
if this_ver == latest_ver: if this_ver == latest_ver:
output.print_ok("Mail-in-a-Box is up to date. You are running version %s." % this_ver) output.print_ok(f"Mail-in-a-Box is up to date. You are running version {this_ver}.")
elif latest_ver is None: elif latest_ver is None:
output.print_error("Latest Mail-in-a-Box version could not be determined. You are running version %s." % this_ver) output.print_error(f"Latest Mail-in-a-Box version could not be determined. You are running version {this_ver}.")
else: else:
output.print_error(f"A new version of Mail-in-a-Box is available. You are running version {this_ver}. The latest version is {latest_ver}. For upgrade instructions, see https://mailinabox.email. ") output.print_error(f"A new version of Mail-in-a-Box is available. You are running version {this_ver}. The latest version is {latest_ver}. For upgrade instructions, see https://mailinabox.email. ")
@ -1033,7 +1033,7 @@ def run_and_output_changes(env, pool):
if op in {"replace", "insert"}: if op in {"replace", "insert"}:
BufferedOutput(with_lines=cur_lines[j1:j2]).playback(out) BufferedOutput(with_lines=cur_lines[j1:j2]).playback(out)
for category, prev_lines in prev_status.items(): for category in prev_status.keys():
if category not in cur_status: if category not in cur_status:
out.add_heading(category) out.add_heading(category)
out.print_warning("This section was removed.") out.print_warning("This section was removed.")
@ -1074,7 +1074,7 @@ class FileOutput:
def print_block(self, message, first_line=" "): def print_block(self, message, first_line=" "):
print(first_line, end='', file=self.buf) print(first_line, end='', file=self.buf)
message = re.sub("\n\\s*", " ", message) message = re.sub(r"\n\s*", " ", message)
words = re.split(r"(\s+)", message) words = re.split(r"(\s+)", message)
linelen = 0 linelen = 0
for w in words: for w in words:
@ -1082,7 +1082,7 @@ class FileOutput:
print(file=self.buf) print(file=self.buf)
print(" ", end="", file=self.buf) print(" ", end="", file=self.buf)
linelen = 0 linelen = 0
if linelen == 0 and w.strip() == "": continue if linelen == 0 and not w.strip(): continue
print(w, end="", file=self.buf) print(w, end="", file=self.buf)
linelen += len(w) linelen += len(w)
print(file=self.buf) print(file=self.buf)
@ -1114,7 +1114,7 @@ class ConsoleOutput(FileOutput):
class BufferedOutput: class BufferedOutput:
# Record all of the instance method calls so we can play them back later. # Record all of the instance method calls so we can play them back later.
def __init__(self, with_lines=None): def __init__(self, with_lines=None):
self.buf = with_lines if with_lines else [] self.buf = with_lines or []
def __getattr__(self, attr): def __getattr__(self, attr):
if attr not in {"add_heading", "print_ok", "print_error", "print_warning", "print_block", "print_line"}: if attr not in {"add_heading", "print_ok", "print_error", "print_warning", "print_block", "print_line"}:
raise AttributeError raise AttributeError

View File

@ -39,9 +39,9 @@ def load_settings(env):
with open(fn, encoding="utf-8") as f: with open(fn, encoding="utf-8") as f:
config = rtyaml.load(f) config = rtyaml.load(f)
if not isinstance(config, dict): raise ValueError # caught below if not isinstance(config, dict): raise ValueError # caught below
return config
except: except:
return { } return { }
return config
# UTILITIES # UTILITIES
@ -135,8 +135,7 @@ def shell(method, cmd_args, env=None, capture_stderr=False, return_bytes=False,
if not return_bytes and isinstance(ret, bytes): ret = ret.decode("utf8") if not return_bytes and isinstance(ret, bytes): ret = ret.decode("utf8")
if not trap: if not trap:
return ret return ret
else: return code, ret
return code, ret
def create_syslog_handler(): def create_syslog_handler():
import logging.handlers import logging.handlers
@ -173,10 +172,11 @@ def wait_for_service(port, public, env, timeout):
s.settimeout(timeout/3) s.settimeout(timeout/3)
try: try:
s.connect(("127.0.0.1" if not public else env['PUBLIC_IP'], port)) s.connect(("127.0.0.1" if not public else env['PUBLIC_IP'], port))
return True
except OSError: except OSError:
if time.perf_counter() > start+timeout: if time.perf_counter() > start+timeout:
return False return False
else:
return True
time.sleep(min(timeout/4, 1)) time.sleep(min(timeout/4, 1))
def get_ssh_port(): def get_ssh_port():

View File

@ -167,7 +167,7 @@ def make_domain_config(domain, templates, ssl_certificates, env):
proxy_redirect_off = False proxy_redirect_off = False
frame_options_header_sameorigin = False frame_options_header_sameorigin = False
web_sockets = False web_sockets = False
m = re.search("#(.*)$", url) m = re.search(r"#(.*)$", url)
if m: if m:
for flag in m.group(1).split(","): for flag in m.group(1).split(","):
if flag == "pass-http-host": if flag == "pass-http-host":
@ -178,10 +178,10 @@ def make_domain_config(domain, templates, ssl_certificates, env):
frame_options_header_sameorigin = True frame_options_header_sameorigin = True
elif flag == "web-sockets": elif flag == "web-sockets":
web_sockets = True web_sockets = True
url = re.sub("#(.*)$", "", url) url = re.sub(r"#(.*)$", "", url)
nginx_conf_extra += "\tlocation %s {" % path nginx_conf_extra += f"\tlocation {path} {{"
nginx_conf_extra += "\n\t\tproxy_pass %s;" % url nginx_conf_extra += f"\n\t\tproxy_pass {url};"
if proxy_redirect_off: if proxy_redirect_off:
nginx_conf_extra += "\n\t\tproxy_redirect off;" nginx_conf_extra += "\n\t\tproxy_redirect off;"
if pass_http_host_header: if pass_http_host_header:
@ -198,8 +198,8 @@ def make_domain_config(domain, templates, ssl_certificates, env):
nginx_conf_extra += "\n\t\tproxy_set_header X-Real-IP $remote_addr;" nginx_conf_extra += "\n\t\tproxy_set_header X-Real-IP $remote_addr;"
nginx_conf_extra += "\n\t}\n" nginx_conf_extra += "\n\t}\n"
for path, alias in yaml.get("aliases", {}).items(): for path, alias in yaml.get("aliases", {}).items():
nginx_conf_extra += "\tlocation %s {" % path nginx_conf_extra += f"\tlocation {path} {{"
nginx_conf_extra += "\n\t\talias %s;" % alias nginx_conf_extra += f"\n\t\talias {alias};"
nginx_conf_extra += "\n\t}\n" nginx_conf_extra += "\n\t}\n"
for path, url in yaml.get("redirects", {}).items(): for path, url in yaml.get("redirects", {}).items():
nginx_conf_extra += f"\trewrite {path} {url} permanent;\n" nginx_conf_extra += f"\trewrite {path} {url} permanent;\n"
@ -216,14 +216,14 @@ def make_domain_config(domain, templates, ssl_certificates, env):
# Add in any user customizations in the includes/ folder. # Add in any user customizations in the includes/ folder.
nginx_conf_custom_include = os.path.join(env["STORAGE_ROOT"], "www", safe_domain_name(domain) + ".conf") nginx_conf_custom_include = os.path.join(env["STORAGE_ROOT"], "www", safe_domain_name(domain) + ".conf")
if os.path.exists(nginx_conf_custom_include): if os.path.exists(nginx_conf_custom_include):
nginx_conf_extra += "\tinclude %s;\n" % (nginx_conf_custom_include) nginx_conf_extra += f"\tinclude {nginx_conf_custom_include};\n"
# PUT IT ALL TOGETHER # PUT IT ALL TOGETHER
# Combine the pieces. Iteratively place each template into the "# ADDITIONAL DIRECTIVES HERE" placeholder # Combine the pieces. Iteratively place each template into the "# ADDITIONAL DIRECTIVES HERE" placeholder
# of the previous template. # of the previous template.
nginx_conf = "# ADDITIONAL DIRECTIVES HERE\n" nginx_conf = "# ADDITIONAL DIRECTIVES HERE\n"
for t in [*templates, nginx_conf_extra]: for t in [*templates, nginx_conf_extra]:
nginx_conf = re.sub("[ \t]*# ADDITIONAL DIRECTIVES HERE *\n", t, nginx_conf) nginx_conf = re.sub(r"[ \t]*# ADDITIONAL DIRECTIVES HERE *\n", t, nginx_conf)
# Replace substitution strings in the template & return. # Replace substitution strings in the template & return.
nginx_conf = nginx_conf.replace("$STORAGE_ROOT", env['STORAGE_ROOT']) nginx_conf = nginx_conf.replace("$STORAGE_ROOT", env['STORAGE_ROOT'])
@ -256,10 +256,9 @@ def get_web_domains_info(env):
cert_status, cert_status_details = check_certificate(domain, tls_cert["certificate"], tls_cert["private-key"]) cert_status, cert_status_details = check_certificate(domain, tls_cert["certificate"], tls_cert["private-key"])
if cert_status == "OK": if cert_status == "OK":
return ("success", "Signed & valid. " + cert_status_details) return ("success", "Signed & valid. " + cert_status_details)
elif cert_status == "SELF-SIGNED": if cert_status == "SELF-SIGNED":
return ("warning", "Self-signed. Get a signed certificate to stop warnings.") return ("warning", "Self-signed. Get a signed certificate to stop warnings.")
else: return ("danger", "Certificate has a problem: " + cert_status)
return ("danger", "Certificate has a problem: " + cert_status)
return [ return [
{ {

69
pyproject.toml Normal file
View File

@ -0,0 +1,69 @@
[tool.ruff]
line-length = 320 # https://github.com/astral-sh/ruff/issues/8106
indent-width = 4
target-version = "py310"
preview = true
output-format = "concise"
extend-exclude = ["tools/mail.py"]
[tool.ruff.lint]
select = [
"F",
"E4",
"E7",
"E9",
"W",
"UP",
"YTT",
"S",
"BLE",
"B",
"A",
"C4",
"T10",
"DJ",
"EM",
"EXE",
"ISC",
"ICN",
"G",
"PIE",
"PYI",
"Q003",
"Q004",
"RSE",
"RET",
"SLF",
"SLOT",
"SIM",
"TID",
"TC",
"ARG",
"PGH",
"PL",
"TRY",
"FLY",
"PERF",
"FURB",
"LOG",
"RUF"
]
ignore = [
"W191",
"PLR09",
"PLR1702",
"PLR2004",
"RUF001",
"RUF002",
"RUF003",
"RUF023"
]
[tool.ruff.format]
quote-style = "preserve"
indent-style = "tab"

View File

@ -23,7 +23,7 @@ def migration_1(env):
# Migrate the 'domains' directory. # Migrate the 'domains' directory.
for sslfn in glob.glob(os.path.join( env["STORAGE_ROOT"], 'ssl/domains/*' )): for sslfn in glob.glob(os.path.join( env["STORAGE_ROOT"], 'ssl/domains/*' )):
fn = os.path.basename(sslfn) fn = os.path.basename(sslfn)
m = re.match("(.*)_(certifiate.pem|cert_sign_req.csr|private_key.pem)$", fn) m = re.match(r"(.*)_(certifiate.pem|cert_sign_req.csr|private_key.pem)$", fn)
if m: if m:
# get the new name for the file # get the new name for the file
domain_name, file_type = m.groups() domain_name, file_type = m.groups()
@ -86,7 +86,9 @@ def migration_7(env):
if newemail != email: if newemail != email:
c = conn.cursor() c = conn.cursor()
c.execute("UPDATE aliases SET source=? WHERE source=?", (newemail, email)) c.execute("UPDATE aliases SET source=? WHERE source=?", (newemail, email))
if c.rowcount != 1: raise ValueError("Alias not found.") if c.rowcount != 1:
msg = "Alias not found."
raise ValueError(msg)
print("Updated alias", email, "to", newemail) print("Updated alias", email, "to", newemail)
except Exception as e: except Exception as e:
print("Error updating IDNA alias", email, e) print("Error updating IDNA alias", email, e)
@ -164,7 +166,7 @@ def migration_12(env):
try: try:
table = table[0] table = table[0]
c = conn.cursor() c = conn.cursor()
dropcmd = "DROP TABLE %s" % table dropcmd = f"DROP TABLE {table}"
c.execute(dropcmd) c.execute(dropcmd)
except: except:
print("Failed to drop table", table) print("Failed to drop table", table)
@ -202,7 +204,7 @@ def get_current_migration():
ver = 0 ver = 0
while True: while True:
next_ver = (ver + 1) next_ver = (ver + 1)
migration_func = globals().get("migration_%d" % next_ver) migration_func = globals().get(f"migration_{next_ver:d}")
if not migration_func: if not migration_func:
return ver return ver
ver = next_ver ver = next_ver
@ -234,14 +236,14 @@ def run_migrations():
while True: while True:
next_ver = (ourver + 1) next_ver = (ourver + 1)
migration_func = globals().get("migration_%d" % next_ver) migration_func = globals().get(f"migration_{next_ver:d}")
if not migration_func: if not migration_func:
# No more migrations to run. # No more migrations to run.
break break
print() print()
print("Running migration to Mail-in-a-Box #%d..." % next_ver) print(f"Running migration to Mail-in-a-Box #{next_ver:d}...")
try: try:
migration_func(env) migration_func(env)

View File

@ -142,7 +142,8 @@ def http_test(url, expected_status, postdata=None, qsargs=None, auth=None):
# return response status code # return response status code
if r.status_code != expected_status: if r.status_code != expected_status:
r.raise_for_status() # anything but 200 r.raise_for_status() # anything but 200
raise OSError("Got unexpected status code %s." % r.status_code) msg = f"Got unexpected status code {r.status_code}."
raise OSError(msg)
# define how to run a test # define how to run a test
@ -198,7 +199,7 @@ def run_test(testfunc, args, count, within_seconds, parallel):
# Did we make enough requests within the limit? # Did we make enough requests within the limit?
if (time.time()-start_time) > within_seconds: if (time.time()-start_time) > within_seconds:
raise Exception("Test failed to make %s requests in %d seconds." % (count, within_seconds)) raise Exception(f"Test failed to make {count} requests in {within_seconds:d} seconds.")
# Wait a moment for the block to be put into place. # Wait a moment for the block to be put into place.
time.sleep(4) time.sleep(4)

View File

@ -51,7 +51,7 @@ def test2(tests, server, description):
response = dns.resolver.resolve(qname, rtype) response = dns.resolver.resolve(qname, rtype)
except dns.resolver.NoNameservers: except dns.resolver.NoNameservers:
# host did not have an answer for this query # host did not have an answer for this query
print("Could not connect to %s for DNS query." % server) print(f"Could not connect to {server} for DNS query.")
sys.exit(1) sys.exit(1)
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer): except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
# host did not have an answer for this query; not sure what the # host did not have an answer for this query; not sure what the
@ -79,7 +79,7 @@ def test2(tests, server, description):
# Test the response from the machine itself. # Test the response from the machine itself.
if not test(ipaddr, "Mail-in-a-Box"): if not test(ipaddr, "Mail-in-a-Box"):
print () print ()
print ("Please run the Mail-in-a-Box setup script on %s again." % hostname) print (f"Please run the Mail-in-a-Box setup script on {hostname} again.")
sys.exit(1) sys.exit(1)
else: else:
print ("The Mail-in-a-Box provided correct DNS answers.") print ("The Mail-in-a-Box provided correct DNS answers.")
@ -89,7 +89,7 @@ else:
# to see if the machine is hooked up to recursive DNS properly. # to see if the machine is hooked up to recursive DNS properly.
if not test("8.8.8.8", "Google Public DNS"): if not test("8.8.8.8", "Google Public DNS"):
print () print ()
print ("Check that the nameserver settings for %s are correct at your domain registrar. It may take a few hours for Google Public DNS to update after changes on your Mail-in-a-Box." % hostname) print (f"Check that the nameserver settings for {hostname} are correct at your domain registrar. It may take a few hours for Google Public DNS to update after changes on your Mail-in-a-Box.")
sys.exit(1) sys.exit(1)
else: else:
print ("Your domain registrar or DNS host appears to be configured correctly as well. Public DNS provides the same answers.") print ("Your domain registrar or DNS host appears to be configured correctly as well. Public DNS provides the same answers.")

View File

@ -46,7 +46,7 @@ reverse_ip = dns.reversename.from_address(ipaddr) # e.g. "1.0.0.127.in-addr.arpa
try: try:
reverse_dns = dns.resolver.resolve(reverse_ip, 'PTR')[0].target.to_text(omit_final_dot=True) # => hostname reverse_dns = dns.resolver.resolve(reverse_ip, 'PTR')[0].target.to_text(omit_final_dot=True) # => hostname
except dns.resolver.NXDOMAIN: except dns.resolver.NXDOMAIN:
print("Reverse DNS lookup failed for %s. SMTP EHLO name check skipped." % ipaddr) print(f"Reverse DNS lookup failed for {ipaddr}. SMTP EHLO name check skipped.")
reverse_dns = None reverse_dns = None
if reverse_dns is not None: if reverse_dns is not None:
server.ehlo_or_helo_if_needed() # must send EHLO before getting the server's EHLO name server.ehlo_or_helo_if_needed() # must send EHLO before getting the server's EHLO name
@ -54,7 +54,7 @@ if reverse_dns is not None:
if helo_name != reverse_dns: if helo_name != reverse_dns:
print("The server's EHLO name does not match its reverse hostname. Check DNS settings.") print("The server's EHLO name does not match its reverse hostname. Check DNS settings.")
else: else:
print("SMTP EHLO name (%s) is OK." % helo_name) print(f"SMTP EHLO name ({helo_name}) is OK.")
# Login and send a test email. # Login and send a test email.
server.login(emailaddress, pw) server.login(emailaddress, pw)

View File

@ -69,7 +69,7 @@ MOZILLA_CIPHERS_OLD = "ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305
def sslyze(opts, port, ok_ciphers): def sslyze(opts, port, ok_ciphers):
# Print header. # Print header.
header = ("PORT %d" % port) header = (f"PORT {port:d}")
print(header) print(header)
print("-" * (len(header))) print("-" * (len(header)))
@ -83,7 +83,7 @@ def sslyze(opts, port, ok_ciphers):
proxy_proc = None proxy_proc = None
if proxy: if proxy:
connection_string = "localhost:10023" connection_string = "localhost:10023"
proxy_proc = subprocess.Popen(["ssh", "-N", "-L10023:%s:%d" % (host, port), proxy]) proxy_proc = subprocess.Popen(["ssh", "-N", f"-L10023:{host}:{port:d}", proxy])
time.sleep(3) time.sleep(3)
try: try:
@ -94,9 +94,9 @@ def sslyze(opts, port, ok_ciphers):
# Trim output to make better for storing in git. # Trim output to make better for storing in git.
if "SCAN RESULTS FOR" not in out: if "SCAN RESULTS FOR" not in out:
# Failed. Just output the error. # Failed. Just output the error.
out = re.sub("[\\w\\W]*CHECKING HOST\\(S\\) AVAILABILITY\n\\s*-+\n", "", out) # chop off header that shows the host we queried out = re.sub(r"[\w\W]*CHECKING HOST\(S\) AVAILABILITY\n\s*-+\n", "", out) # chop off header that shows the host we queried
out = re.sub("[\\w\\W]*SCAN RESULTS FOR.*\n\\s*-+\n", "", out) # chop off header that shows the host we queried out = re.sub(r"[\w\W]*SCAN RESULTS FOR.*\n\s*-+\n", "", out) # chop off header that shows the host we queried
out = re.sub("SCAN COMPLETED IN .*", "", out) out = re.sub(r"SCAN COMPLETED IN .*", "", out)
out = out.rstrip(" \n-") + "\n" out = out.rstrip(" \n-") + "\n"
# Print. # Print.
@ -105,8 +105,8 @@ def sslyze(opts, port, ok_ciphers):
# Pull out the accepted ciphers list for each SSL/TLS protocol # Pull out the accepted ciphers list for each SSL/TLS protocol
# version outputted. # version outputted.
accepted_ciphers = set() accepted_ciphers = set()
for ciphers in re.findall(" Accepted:([\\w\\W]*?)\n *\n", out): for ciphers in re.findall(r" Accepted:([\w\W]*?)\n *\n", out):
accepted_ciphers |= set(re.findall("\n\\s*(\\S*)", ciphers)) accepted_ciphers |= set(re.findall(r"\n\s*(\S*)", ciphers))
# Compare to what Mozilla recommends, for a given modernness-level. # Compare to what Mozilla recommends, for a given modernness-level.
print(" Should Not Offer: " + (", ".join(sorted(accepted_ciphers-set(ok_ciphers))) or "(none -- good)")) print(" Should Not Offer: " + (", ".join(sorted(accepted_ciphers-set(ok_ciphers))) or "(none -- good)"))

View File

@ -32,7 +32,7 @@ for fn in glob.glob("/var/log/nginx/access.log*"):
# Aggregate by date. # Aggregate by date.
by_date = { } by_date = { }
for date, ip in accesses: for date, _ip in accesses:
by_date[date] = by_date.get(date, 0) + 1 by_date[date] = by_date.get(date, 0) + 1
# Since logs are rotated, store the statistics permanently in a JSON file. # Since logs are rotated, store the statistics permanently in a JSON file.

View File

@ -124,14 +124,14 @@ def generate_documentation():
""") """)
parser = Source.parser() parser = Source.parser()
with open("setup/start.sh", "r") as start_file: with open("setup/start.sh", encoding="utf-8") as start_file:
for line in start_file: for line in start_file:
try: try:
fn = parser.parse_string(line).filename() fn = parser.parse_string(line).filename()
except: except:
continue continue
if fn in ("setup/start.sh", "setup/preflight.sh", "setup/questions.sh", "setup/firstuser.sh", "setup/management.sh"): if fn in {"setup/start.sh", "setup/preflight.sh", "setup/questions.sh", "setup/firstuser.sh", "setup/management.sh"}:
continue continue
import sys import sys
print(fn, file=sys.stderr) print(fn, file=sys.stderr)
@ -171,7 +171,7 @@ def strip_indent(s):
class Comment(Grammar): class Comment(Grammar):
grammar = ONE_OR_MORE(ZERO_OR_MORE(SPACE), L('#'), REST_OF_LINE, EOL) grammar = ONE_OR_MORE(ZERO_OR_MORE(SPACE), L('#'), REST_OF_LINE, EOL)
def value(self): def value(self):
if self.string.replace("#", "").strip() == "": if not self.string.replace("#", "").strip():
return "\n" return "\n"
lines = [x[2].string for x in self[0]] lines = [x[2].string for x in self[0]]
content = "\n".join(lines) content = "\n".join(lines)
@ -192,8 +192,7 @@ class CatEOF(Grammar):
def value(self): def value(self):
content = self[9].string content = self[9].string
content = re.sub(r"\\([$])", r"\1", content) # un-escape bash-escaped characters content = re.sub(r"\\([$])", r"\1", content) # un-escape bash-escaped characters
return "<div class='write-to'><div class='filename'>%s <span>(%s)</span></div><pre>%s</pre></div>\n" \ return "<div class='write-to'><div class='filename'>{} <span>({})</span></div><pre>{}</pre></div>\n".format(self[4].string,
% (self[4].string,
"overwrite" if ">>" not in self[2].string else "append to", "overwrite" if ">>" not in self[2].string else "append to",
cgi.escape(content)) cgi.escape(content))
@ -223,14 +222,14 @@ class EditConf(Grammar):
EOL EOL
) )
def value(self): def value(self):
conffile = self[1] # conffile = self[1]
options = [] options = []
eq = "=" eq = "="
if self[3] and "-s" in self[3].string: eq = " " if self[3] and "-s" in self[3].string: eq = " "
for opt in re.split("\s+", self[4].string): for opt in re.split(r"\s+", self[4].string):
k, v = opt.split("=", 1) k, v = opt.split("=", 1)
v = re.sub(r"\n+", "", fixup_tokens(v)) # not sure why newlines are getting doubled v = re.sub(r"\n+", "", fixup_tokens(v)) # not sure why newlines are getting doubled
options.append("%s%s%s" % (k, eq, v)) options.append(f"{k}{eq}{v}")
return "<div class='write-to'><div class='filename'>" + self[1].string + " <span>(change settings)</span></div><pre>" + "\n".join(cgi.escape(s) for s in options) + "</pre></div>\n" return "<div class='write-to'><div class='filename'>" + self[1].string + " <span>(change settings)</span></div><pre>" + "\n".join(cgi.escape(s) for s in options) + "</pre></div>\n"
class CaptureOutput(Grammar): class CaptureOutput(Grammar):
@ -248,8 +247,8 @@ class SedReplace(Grammar):
class EchoPipe(Grammar): class EchoPipe(Grammar):
grammar = OPTIONAL(SPACE), L("echo "), REST_OF_LINE, L(' | '), REST_OF_LINE, EOL grammar = OPTIONAL(SPACE), L("echo "), REST_OF_LINE, L(' | '), REST_OF_LINE, EOL
def value(self): def value(self):
text = " ".join("\"%s\"" % s for s in self[2].string.split(" ")) text = " ".join(f'"{s}"' for s in self[2].string.split(" "))
return "<pre class='shell'><div>echo " + recode_bash(text) + " \<br> | " + recode_bash(self[4].string) + "</div></pre>\n" return "<pre class='shell'><div>echo " + recode_bash(text) + r" \<br> | " + recode_bash(self[4].string) + "</div></pre>\n"
def shell_line(bash): def shell_line(bash):
return "<pre class='shell'><div>" + recode_bash(bash.strip()) + "</div></pre>\n" return "<pre class='shell'><div>" + recode_bash(bash.strip()) + "</div></pre>\n"
@ -274,7 +273,7 @@ class RestartService(Grammar):
class OtherLine(Grammar): class OtherLine(Grammar):
grammar = (REST_OF_LINE, EOL) grammar = (REST_OF_LINE, EOL)
def value(self): def value(self):
if self.string.strip() == "": return "" if not self.string.strip(): return ""
if "source setup/functions.sh" in self.string: return "" if "source setup/functions.sh" in self.string: return ""
if "source /etc/mailinabox.conf" in self.string: return "" if "source /etc/mailinabox.conf" in self.string: return ""
return "<pre class='shell'><div>" + recode_bash(self.string.strip()) + "</div></pre>\n" return "<pre class='shell'><div>" + recode_bash(self.string.strip()) + "</div></pre>\n"
@ -324,7 +323,7 @@ def quasitokenize(bashscript):
elif c == "\\": elif c == "\\":
# Escaping next character. # Escaping next character.
escape_next = True escape_next = True
elif quote_mode is None and c in ('"', "'"): elif quote_mode is None and c in {'"', "'"}:
# Starting a quoted word. # Starting a quoted word.
quote_mode = c quote_mode = c
elif c == quote_mode: elif c == quote_mode:
@ -364,9 +363,9 @@ def quasitokenize(bashscript):
newscript += c newscript += c
# "<< EOF" escaping. # "<< EOF" escaping.
if quote_mode is None and re.search("<<\s*EOF\n$", newscript): if quote_mode is None and re.search(r"<<\s*EOF\n$", newscript):
quote_mode = "EOF" quote_mode = "EOF"
elif quote_mode == "EOF" and re.search("\nEOF\n$", newscript): elif quote_mode == "EOF" and re.search(r"\nEOF\n$", newscript):
quote_mode = None quote_mode = None
return newscript return newscript
@ -378,7 +377,7 @@ def recode_bash(s):
tok = tok.replace(c, "\\" + c) tok = tok.replace(c, "\\" + c)
tok = fixup_tokens(tok) tok = fixup_tokens(tok)
if " " in tok or '"' in tok: if " " in tok or '"' in tok:
tok = tok.replace("\"", "\\\"") tok = tok.replace('"', '\\"')
tok = '"' + tok +'"' tok = '"' + tok +'"'
else: else:
tok = tok.replace("'", "\\'") tok = tok.replace("'", "\\'")
@ -401,25 +400,24 @@ class BashScript(Grammar):
@staticmethod @staticmethod
def parse(fn): def parse(fn):
if fn in ("setup/functions.sh", "/etc/mailinabox.conf"): return "" if fn in {"setup/functions.sh", "/etc/mailinabox.conf"}: return ""
with open(fn, "r") as f: with open(fn, encoding="utf-8") as f:
string = f.read() string = f.read()
# tokenize # tokenize
string = re.sub(".* #NODOC\n", "", string) string = re.sub(r".* #NODOC\n", "", string)
string = re.sub("\n\s*if .*then.*|\n\s*fi|\n\s*else|\n\s*elif .*", "", string) string = re.sub(r"\n\s*if .*then.*|\n\s*fi|\n\s*else|\n\s*elif .*", "", string)
string = quasitokenize(string) string = quasitokenize(string)
string = re.sub("hide_output ", "", string) string = string.replace("hide_output ", "")
parser = BashScript.parser() parser = BashScript.parser()
result = parser.parse_string(string) result = parser.parse_string(string)
v = "<div class='row'><div class='col-xs-12 sourcefile'>view the bash source for the following section at <a href=\"%s\">%s</a></div></div>\n" \ v = "<div class='row'><div class='col-xs-12 sourcefile'>view the bash source for the following section at <a href=\"{}\">{}</a></div></div>\n".format("https://github.com/mail-in-a-box/mailinabox/tree/master/" + fn, fn)
% ("https://github.com/mail-in-a-box/mailinabox/tree/master/" + fn, fn)
mode = 0 mode = 0
for item in result.value(): for item in result.value():
if item.strip() == "": if not item.strip():
pass pass
elif item.startswith("<p") and not item.startswith("<pre"): elif item.startswith("<p") and not item.startswith("<pre"):
clz = "" clz = ""
@ -429,7 +427,7 @@ class BashScript(Grammar):
mode = 0 mode = 0
clz = "contd" clz = "contd"
if mode == 0: if mode == 0:
v += "<div class='row %s'>\n" % clz v += f"<div class='row {clz}'>\n"
v += "<div class='col-md-6 prose'>\n" v += "<div class='col-md-6 prose'>\n"
v += item v += item
mode = 1 mode = 1
@ -460,24 +458,23 @@ class BashScript(Grammar):
v = fixup_tokens(v) v = fixup_tokens(v)
v = v.replace("</pre>\n<pre class='shell'>", "") v = v.replace("</pre>\n<pre class='shell'>", "")
v = re.sub("<pre>([\w\W]*?)</pre>", lambda m : "<pre>" + strip_indent(m.group(1)) + "</pre>", v) v = re.sub(r"<pre>([\w\W]*?)</pre>", lambda m : "<pre>" + strip_indent(m.group(1)) + "</pre>", v)
v = re.sub(r"(\$?)PRIMARY_HOSTNAME", r"<b>box.yourdomain.com</b>", v) v = re.sub(r"(\$?)PRIMARY_HOSTNAME", r"<b>box.yourdomain.com</b>", v)
v = re.sub(r"\$STORAGE_ROOT", r"<b>$STORE</b>", v) v = re.sub(r"\$STORAGE_ROOT", r"<b>$STORE</b>", v)
v = v.replace("`pwd`", "<code><b>/path/to/mailinabox</b></code>") return v.replace("`pwd`", "<code><b>/path/to/mailinabox</b></code>")
return v
def wrap_lines(text, cols=60): def wrap_lines(text, cols=60):
ret = "" ret = ""
words = re.split("(\s+)", text) words = re.split(r"(\s+)", text)
linelen = 0 linelen = 0
for w in words: for w in words:
if linelen + len(w) > cols-1: if linelen + len(w) > cols-1:
ret += " \\\n" ret += " \\\n"
ret += " " ret += " "
linelen = 0 linelen = 0
if linelen == 0 and w.strip() == "": continue if linelen == 0 and not w.strip(): continue
ret += w ret += w
linelen += len(w) linelen += len(w)
return ret return ret