1
0
mirror of https://github.com/mail-in-a-box/mailinabox.git synced 2026-03-20 18:27:23 +01:00

Merge remote-tracking branch 'upstream/main' into merge-upstream

# Conflicts:
#	.gitignore
#	management/auth.py
#	management/daemon.py
#	management/mail_log.py
#	management/mailconfig.py
#	management/mfa.py
#	management/ssl_certificates.py
#	management/status_checks.py
#	management/utils.py
#	management/web_update.py
#	setup/mail-postfix.sh
#	setup/migrate.py
#	setup/preflight.sh
#	setup/webmail.sh
#	tests/test_mail.py
#	tools/editconf.py
This commit is contained in:
downtownallday
2024-03-12 07:41:14 -04:00
33 changed files with 582 additions and 571 deletions

View File

@@ -20,11 +20,11 @@
# service mailinabox start # when done debugging, start it up again
import os, os.path, re, json, time
import multiprocessing.pool, subprocess
import multiprocessing.pool
from functools import wraps
from flask import Flask, request, render_template, abort, Response, send_from_directory, make_response
from flask import Flask, request, render_template, Response, send_from_directory, make_response
import auth, utils
from mailconfig import get_mail_users, get_mail_users_ex, get_admins, add_mail_user, set_mail_password, set_mail_display_name, remove_mail_user
@@ -32,6 +32,7 @@ from mailconfig import get_mail_user_privileges, add_remove_mail_user_privilege
from mailconfig import get_mail_aliases, get_mail_aliases_ex, get_mail_domains, add_mail_alias, remove_mail_alias
from mfa import get_public_mfa_state, enable_mfa, disable_mfa
import mfa_totp
import contextlib
env = utils.load_environment()
@@ -39,14 +40,12 @@ auth_service = auth.AuthService()
# We may deploy via a symbolic link, which confuses flask's template finding.
me = __file__
try:
with contextlib.suppress(OSError):
me = os.readlink(__file__)
except OSError:
pass
# for generating CSRs we need a list of country codes
csr_country_codes = []
with open(os.path.join(os.path.dirname(me), "csr_country_codes.tsv")) as f:
with open(os.path.join(os.path.dirname(me), "csr_country_codes.tsv"), encoding="utf-8") as f:
for line in f:
if line.strip() == "" or line.startswith("#"): continue
code, name = line.strip().split("\t")[0:2]
@@ -90,7 +89,7 @@ def authorized_personnel_only(viewfunc):
# Not authorized. Return a 401 (send auth) and a prompt to authorize by default.
status = 401
headers = {
'WWW-Authenticate': 'Basic realm="{0}"'.format(auth_service.auth_realm),
'WWW-Authenticate': f'Basic realm="{auth_service.auth_realm}"',
'X-Reason': error,
}
@@ -100,7 +99,7 @@ def authorized_personnel_only(viewfunc):
status = 403
headers = None
if request.headers.get('Accept') in (None, "", "*/*"):
if request.headers.get('Accept') in {None, "", "*/*"}:
# Return plain text output.
return Response(error+"\n", status=status, mimetype='text/plain', headers=headers)
else:
@@ -174,7 +173,7 @@ def login():
"api_key": auth_service.create_session_key(email, env, type='login'),
}
app.logger.info("New login session created for {}".format(email))
app.logger.info(f"New login session created for {email}")
# Return.
return json_response(resp)
@@ -183,8 +182,8 @@ def login():
def logout():
try:
email, _ = auth_service.authenticate(request, env, logout=True)
app.logger.info("{} logged out".format(email))
except ValueError as e:
app.logger.info(f"{email} logged out")
except ValueError:
pass
finally:
return json_response({ "status": "ok" })
@@ -374,9 +373,9 @@ def dns_set_record(qname, rtype="A"):
# Get the existing records matching the qname and rtype.
return dns_get_records(qname, rtype)
elif request.method in ("POST", "PUT"):
elif request.method in {"POST", "PUT"}:
# There is a default value for A/AAAA records.
if rtype in ("A", "AAAA") and value == "":
if rtype in {"A", "AAAA"} and value == "":
value = request.environ.get("HTTP_X_FORWARDED_FOR") # normally REMOTE_ADDR but we're behind nginx as a reverse proxy
# Cannot add empty records.
@@ -438,7 +437,7 @@ def ssl_get_status():
{
"domain": d["domain"],
"status": d["ssl_certificate"][0],
"text": d["ssl_certificate"][1] + ((" " + cant_provision[d["domain"]] if d["domain"] in cant_provision else ""))
"text": d["ssl_certificate"][1] + (" " + cant_provision[d["domain"]] if d["domain"] in cant_provision else "")
} for d in domains_status ]
# Warn the user about domain names not hosted here because of other settings.
@@ -510,7 +509,7 @@ def totp_post_enable():
secret = request.form.get('secret')
token = request.form.get('token')
label = request.form.get('label')
if type(token) != str:
if not isinstance(token, str):
return ("Bad Input", 400)
try:
mfa_totp.validate_secret(secret)
@@ -606,8 +605,7 @@ def system_status():
def show_updates():
from status_checks import list_apt_updates
return "".join(
"%s (%s)\n"
% (p["package"], p["version"])
"{} ({})\n".format(p["package"], p["version"])
for p in list_apt_updates())
@app.route('/system/update-packages', methods=["POST"])
@@ -806,14 +804,11 @@ def log_failed_login(request):
# During setup we call the management interface directly to determine the user
# status. So we can't always use X-Forwarded-For because during setup that header
# will not be present.
if request.headers.getlist("X-Forwarded-For"):
ip = request.headers.getlist("X-Forwarded-For")[0]
else:
ip = request.remote_addr
ip = request.headers.getlist("X-Forwarded-For")[0] if request.headers.getlist("X-Forwarded-For") else request.remote_addr
# We need to add a timestamp to the log message, otherwise /dev/log will eat the "duplicate"
# message.
app.logger.warning( "Mail-in-a-Box Management Daemon: Failed login attempt from ip %s - timestamp %s" % (ip, time.time()))
app.logger.warning( f"Mail-in-a-Box Management Daemon: Failed login attempt from ip {ip} - timestamp {time.time()}")
# APP