1
0
mirror of https://github.com/mail-in-a-box/mailinabox.git synced 2026-03-18 18:07:22 +01:00

Store and set alias receivers and senders separately for maximum control

This commit is contained in:
David Piggott
2015-07-04 16:31:11 +01:00
parent 3fdfad27cd
commit e6ff280984
6 changed files with 186 additions and 146 deletions

View File

@@ -58,7 +58,7 @@ def sanitize_idn_email_address(email):
except (ValueError, idna.IDNAError):
# ValueError: String does not have a single @-sign, so it is not
# a valid email address. IDNAError: Domain part is not IDNA-valid.
# Validation is not this function's job, so return value unchanged.
# Validation is not this function's job, so return value unchanged.
# If there are non-ASCII characters it will be filtered out by
# validate_email.
return email
@@ -181,13 +181,13 @@ def get_admins(env):
return users
def get_mail_aliases(env):
# Returns a sorted list of tuples of (alias, forward-to string, applies-to-inbound-mail, applies-to-outbound-mail).
# Returns a sorted list of tuples of (address, forward-tos, permitted-senders).
c = open_database(env)
c.execute('SELECT source, destination, applies_inbound, applies_outbound FROM aliases')
aliases = { row[0]: row[1:4] for row in c.fetchall() } # make dict
c.execute('SELECT address, receivers, senders FROM aliases')
aliases = { row[0]: row[1:3] for row in c.fetchall() } # make dict
# put in a canonical order: sort by domain, then by email address lexicographically
aliases = [ (source,) + aliases[source] for source in utils.sort_email_addresses(aliases.keys(), env) ]
aliases = [ (address,) + aliases[address] for address in utils.sort_email_addresses(aliases.keys(), env) ]
return aliases
def get_mail_aliases_ex(env):
@@ -199,11 +199,10 @@ def get_mail_aliases_ex(env):
# domain: "domain.tld",
# alias: [
# {
# source: "name@domain.tld", # IDNA-encoded
# source_display: "name@domain.tld", # full Unicode
# destination: ["target1@domain.com", "target2@domain.com", ...],
# applies_inbound: True|False
# applies_outbound: True|False
# address: "name@domain.tld", # IDNA-encoded
# address_display: "name@domain.tld", # full Unicode
# receivers: ["user1@domain.com", "receiver-only1@domain.com", ...],
# senders: ["user1@domain.com", "sender-only1@domain.com", ...],
# required: True|False
# },
# ...
@@ -214,10 +213,10 @@ def get_mail_aliases_ex(env):
required_aliases = get_required_aliases(env)
domains = {}
for source, destination, applies_inbound, applies_outbound in get_mail_aliases(env):
for address, receivers, senders in get_mail_aliases(env):
# get alias info
domain = get_domain(source)
required = (source in required_aliases)
domain = get_domain(address)
required = (address in required_aliases)
# add to list
if not domain in domains:
@@ -226,20 +225,19 @@ def get_mail_aliases_ex(env):
"aliases": [],
}
domains[domain]["aliases"].append({
"source": source,
"source_display": prettify_idn_email_address(source),
"destination": [prettify_idn_email_address(d.strip()) for d in destination.split(",")],
"applies_inbound": True if applies_inbound == 1 else False,
"applies_outbound": True if applies_outbound == 1 else False,
"address": address,
"address_display": prettify_idn_email_address(address),
"receivers": [prettify_idn_email_address(r.strip()) for r in receivers.split(",")],
"senders": [prettify_idn_email_address(s.strip()) for s in senders.split(",")],
"required": required,
})
# Sort domains.
domains = [domains[domain] for domain in utils.sort_domains(domains.keys(), env)]
# Sort aliases within each domain first by required-ness then lexicographically by source address.
# Sort aliases within each domain first by required-ness then lexicographically by address.
for domain in domains:
domain["aliases"].sort(key = lambda alias : (alias["required"], alias["source"]))
domain["aliases"].sort(key = lambda alias : (alias["required"], alias["address"]))
return domains
def get_domain(emailaddr, as_unicode=True):
@@ -253,8 +251,8 @@ def get_mail_domains(env, filter_aliases=lambda alias : True):
# Returns the domain names (IDNA-encoded) of all of the email addresses
# configured on the system.
return set(
[get_domain(addr, as_unicode=False) for addr in get_mail_users(env)]
+ [get_domain(source, as_unicode=False) for source, *_ in get_mail_aliases(env) if filter_aliases(source) ]
[get_domain(login, as_unicode=False) for login in get_mail_users(env)]
+ [get_domain(address, as_unicode=False) for address, *_ in get_mail_aliases(env) if filter_aliases(address) ]
)
def add_mail_user(email, pw, privs, env):
@@ -410,67 +408,82 @@ def add_remove_mail_user_privilege(email, priv, action, env):
return "OK"
def add_mail_alias(source, destination, applies_inbound, applies_outbound, env, update_if_exists=False, do_kick=True):
def add_mail_alias(address, receivers, senders, env, update_if_exists=False, do_kick=True):
# convert Unicode domain to IDNA
source = sanitize_idn_email_address(source)
address = sanitize_idn_email_address(address)
# Our database is case sensitive (oops), which affects mail delivery
# (Postfix always queries in lowercase?), so force lowercase.
source = source.lower()
address = address.lower()
# validate source
source = source.strip()
if source == "":
return ("No incoming email address provided.", 400)
if not validate_email(source, mode='alias'):
return ("Invalid incoming email address (%s)." % source, 400)
# validate address
address = address.strip()
if address == "":
return ("No email address provided.", 400)
if not validate_email(address, mode='alias'):
return ("Invalid email address (%s)." % address, 400)
# validate receivers
validated_receivers = []
receivers = receivers.strip()
# extra checks for email addresses used in domain control validation
is_dcv_source = is_dcv_address(source)
# validate destination
dests = []
destination = destination.strip()
is_dcv_source = is_dcv_address(address)
# Postfix allows a single @domain.tld as the destination, which means
# the local part on the address is preserved in the rewrite. We must
# try to convert Unicode to IDNA first before validating that it's a
# legitimate alias address. Don't allow this sort of rewriting for
# DCV source addresses.
d1 = sanitize_idn_email_address(destination)
if validate_email(d1, mode='alias') and not is_dcv_source:
dests.append(d1)
r1 = sanitize_idn_email_address(receivers)
if validate_email(r1, mode='alias') and not is_dcv_source:
validated_receivers.append(r1)
else:
# Parse comma and \n-separated destination emails & validate. In this
# case, the recipients must be complete email addresses.
for line in destination.split("\n"):
# case, the receivers must be complete email addresses.
for line in receivers.split("\n"):
for email in line.split(","):
email = email.strip()
if email == "": continue
email = sanitize_idn_email_address(email) # Unicode => IDNA
if not validate_email(email):
return ("Invalid destination email address (%s)." % email, 400)
return ("Invalid receiver email address (%s)." % email, 400)
if is_dcv_source and not is_dcv_address(email) and "admin" not in get_mail_user_privileges(email, env, empty_on_error=True):
# Make domain control validation hijacking a little harder to mess up by
# requiring aliases for email addresses typically used in DCV to forward
# only to accounts that are administrators on this system.
return ("This alias can only have administrators of this system as destinations because the address is frequently used for domain control validation.", 400)
dests.append(email)
if len(destination) == 0:
return ("No destination email address(es) provided.", 400)
destination = ",".join(dests)
validated_receivers.append(email)
receivers = ",".join(validated_receivers)
valid_logins = get_mail_users(env)
# validate senders
validated_senders = []
senders = senders.strip()
# Parse comma and \n-separated sender logins & validate. The senders must be
# valid usernames.
for line in senders.split("\n"):
for login in line.split(","):
login = login.strip()
if login == "": continue
if login not in valid_logins:
return ("Invalid sender login (%s)." % login, 400)
validated_senders.append(login)
senders = ",".join(validated_senders)
# save to db
conn, c = open_database(env, with_connection=True)
try:
c.execute("INSERT INTO aliases (source, destination, applies_inbound, applies_outbound) VALUES (?, ?, ?, ?)", (source, destination, 1 if applies_inbound else 0, 1 if applies_outbound else 0))
c.execute("INSERT INTO aliases (address, receivers, senders) VALUES (?, ?, ?)", (address, receivers, senders))
return_status = "alias added"
except sqlite3.IntegrityError:
if not update_if_exists:
return ("Alias already exists (%s)." % source, 400)
return ("Alias already exists (%s)." % address, 400)
else:
c.execute("UPDATE aliases SET destination = ?, applies_inbound = ?, applies_outbound = ? WHERE source = ?", (destination, 1 if applies_inbound else 0, 1 if applies_outbound else 0, source))
c.execute("UPDATE aliases SET receivers = ?, senders = ? WHERE address = ?", (receivers, senders, address))
return_status = "alias updated"
conn.commit()
@@ -479,15 +492,15 @@ def add_mail_alias(source, destination, applies_inbound, applies_outbound, env,
# Update things in case any new domains are added.
return kick(env, return_status)
def remove_mail_alias(source, env, do_kick=True):
def remove_mail_alias(address, env, do_kick=True):
# convert Unicode domain to IDNA
source = sanitize_idn_email_address(source)
address = sanitize_idn_email_address(address)
# remove
conn, c = open_database(env, with_connection=True)
c.execute("DELETE FROM aliases WHERE source=?", (source,))
c.execute("DELETE FROM aliases WHERE address=?", (address,))
if c.rowcount != 1:
return ("That's not an alias (%s)." % source, 400)
return ("That's not an alias (%s)." % address, 400)
conn.commit()
if do_kick:
@@ -539,34 +552,34 @@ def kick(env, mail_result=None):
existing_aliases = get_mail_aliases(env)
required_aliases = get_required_aliases(env)
def ensure_admin_alias_exists(source):
def ensure_admin_alias_exists(address):
# If a user account exists with that address, we're good.
if source in existing_users:
if address in existing_users:
return
# Does this alias exists?
for s, *_ in existing_aliases:
if s == source:
for a, *_ in existing_aliases:
if a == address:
return
# Doesn't exist.
administrator = get_system_administrator(env)
if source == administrator: return # don't make an alias from the administrator to itself --- this alias must be created manually
add_mail_alias(source, administrator, True, True, env, do_kick=False)
results.append("added alias %s (=> %s)\n" % (source, administrator))
if address == administrator: return # don't make an alias from the administrator to itself --- this alias must be created manually
add_mail_alias(address, administrator, "", env, do_kick=False)
results.append("added alias %s (<==> %s)\n" % (address, administrator))
for alias in required_aliases:
ensure_admin_alias_exists(alias)
for address in required_aliases:
ensure_admin_alias_exists(address)
# Remove auto-generated postmaster/admin on domains we no
# longer have any other email addresses for.
for source, target, *_ in existing_aliases:
user, domain = source.split("@")
for address, receivers, *_ in existing_aliases:
user, domain = address.split("@")
if user in ("postmaster", "admin") \
and source not in required_aliases \
and target == get_system_administrator(env):
remove_mail_alias(source, env, do_kick=False)
results.append("removed alias %s (was to %s; domain no longer used for email)\n" % (source, target))
and address not in required_aliases \
and receivers == get_system_administrator(env):
remove_mail_alias(address, env, do_kick=False)
results.append("removed alias %s (was to %s; domain no longer used for email)\n" % (address, receivers))
# Update DNS and nginx in case any domains are added/removed.