mirror of
https://github.com/mail-in-a-box/mailinabox.git
synced 2024-11-22 02:17:26 +00:00
53ec0f39cb
* Rename the 'master' API key to be called the 'system' API key * Generate the key using the Python secrets module which is meant for this * Remove some debugging helper code which will be obsoleted by the upcoming changes for session keys
154 lines
5.5 KiB
Python
154 lines
5.5 KiB
Python
import base64, os, os.path, hmac, json, secrets
|
|
|
|
|
|
import utils
|
|
from mailconfig import get_mail_password, get_mail_user_privileges
|
|
from mfa import get_hash_mfa_state, validate_auth_mfa
|
|
|
|
DEFAULT_KEY_PATH = '/var/lib/mailinabox/api.key'
|
|
DEFAULT_AUTH_REALM = 'Mail-in-a-Box Management Server'
|
|
|
|
class AuthService:
|
|
"""Generate an API key for authenticating clients
|
|
|
|
Clients must read the key from the key file and send the key with all HTTP
|
|
requests. The key is passed as the username field in the standard HTTP
|
|
Basic Auth header.
|
|
"""
|
|
def __init__(self):
|
|
self.auth_realm = DEFAULT_AUTH_REALM
|
|
self.key_path = DEFAULT_KEY_PATH
|
|
self.init_system_api_key()
|
|
|
|
def init_system_api_key(self):
|
|
"""Write an API key to a local file so local processes can use the API"""
|
|
|
|
def create_file_with_mode(path, mode):
|
|
# Based on answer by A-B-B: http://stackoverflow.com/a/15015748
|
|
old_umask = os.umask(0)
|
|
try:
|
|
return os.fdopen(os.open(path, os.O_WRONLY | os.O_CREAT, mode), 'w')
|
|
finally:
|
|
os.umask(old_umask)
|
|
|
|
self.key = secrets.token_hex(24)
|
|
|
|
os.makedirs(os.path.dirname(self.key_path), exist_ok=True)
|
|
|
|
with create_file_with_mode(self.key_path, 0o640) as key_file:
|
|
key_file.write(self.key + '\n')
|
|
|
|
def authenticate(self, request, env):
|
|
"""Test if the client key passed in HTTP Authorization header matches the service key
|
|
or if the or username/password passed in the header matches an administrator user.
|
|
Returns a tuple of the user's email address and list of user privileges (e.g.
|
|
('my@email', []) or ('my@email', ['admin']); raises a ValueError on login failure.
|
|
If the user used an API key, the user's email is returned as None."""
|
|
|
|
def decode(s):
|
|
return base64.b64decode(s.encode('ascii')).decode('ascii')
|
|
|
|
def parse_basic_auth(header):
|
|
if " " not in header:
|
|
return None, None
|
|
scheme, credentials = header.split(maxsplit=1)
|
|
if scheme != 'Basic':
|
|
return None, None
|
|
|
|
credentials = decode(credentials)
|
|
if ":" not in credentials:
|
|
return None, None
|
|
username, password = credentials.split(':', maxsplit=1)
|
|
return username, password
|
|
|
|
header = request.headers.get('Authorization')
|
|
if not header:
|
|
raise ValueError("No authorization header provided.")
|
|
|
|
username, password = parse_basic_auth(header)
|
|
|
|
if username in (None, ""):
|
|
raise ValueError("Authorization header invalid.")
|
|
|
|
if username == self.key:
|
|
# The user passed the system API key which grants administrative privs.
|
|
return (None, ["admin"])
|
|
else:
|
|
# The user is trying to log in with a username and either a password
|
|
# (and possibly a MFA token) or a user-specific API key.
|
|
return (username, self.check_user_auth(username, password, request, env))
|
|
|
|
def check_user_auth(self, email, pw, request, env):
|
|
# Validate a user's login email address and password. If MFA is enabled,
|
|
# check the MFA token in the X-Auth-Token header.
|
|
#
|
|
# On success returns a list of privileges (e.g. [] or ['admin']). On login
|
|
# failure, raises a ValueError with a login error message.
|
|
|
|
# Sanity check.
|
|
if email == "" or pw == "":
|
|
raise ValueError("Enter an email address and password.")
|
|
|
|
# The password might be a user-specific API key. create_user_key raises
|
|
# a ValueError if the user does not exist.
|
|
if hmac.compare_digest(self.create_user_key(email, env), pw):
|
|
# OK.
|
|
pass
|
|
else:
|
|
# Get the hashed password of the user. Raise a ValueError if the
|
|
# email address does not correspond to a user.
|
|
pw_hash = get_mail_password(email, env)
|
|
|
|
# Authenticate.
|
|
try:
|
|
# Use 'doveadm pw' to check credentials. doveadm will return
|
|
# a non-zero exit status if the credentials are no good,
|
|
# and check_call will raise an exception in that case.
|
|
utils.shell('check_call', [
|
|
"/usr/bin/doveadm", "pw",
|
|
"-p", pw,
|
|
"-t", pw_hash,
|
|
])
|
|
except:
|
|
# Login failed.
|
|
raise ValueError("Invalid password.")
|
|
|
|
# If MFA is enabled, check that MFA passes.
|
|
status, hints = validate_auth_mfa(email, request, env)
|
|
if not status:
|
|
# Login valid. Hints may have more info.
|
|
raise ValueError(",".join(hints))
|
|
|
|
# Get privileges for authorization. This call should never fail because by this
|
|
# point we know the email address is a valid user. But on error the call will
|
|
# return a tuple of an error message and an HTTP status code.
|
|
privs = get_mail_user_privileges(email, env)
|
|
if isinstance(privs, tuple): raise ValueError(privs[0])
|
|
|
|
# Return a list of privileges.
|
|
return privs
|
|
|
|
def create_user_key(self, email, env):
|
|
# Create a user API key, which is a shared secret that we can re-generate from
|
|
# static information in our database. The shared secret contains the user's
|
|
# email address, current hashed password, and current MFA state, so that the
|
|
# key becomes invalid if any of that information changes.
|
|
#
|
|
# Use an HMAC to generate the API key using our system API key as a key,
|
|
# which also means that the API key becomes invalid when our system API key
|
|
# changes --- i.e. when this process is restarted.
|
|
#
|
|
# Raises ValueError via get_mail_password if the user doesn't exist.
|
|
|
|
# Construct the HMAC message from the user's email address and current password.
|
|
msg = b"AUTH:" + email.encode("utf8") + b" " + get_mail_password(email, env).encode("utf8")
|
|
|
|
# Add to the message the current MFA state, which is a list of MFA information.
|
|
# Turn it into a string stably.
|
|
msg += b" " + json.dumps(get_hash_mfa_state(email, env), sort_keys=True).encode("utf8")
|
|
|
|
# Make the HMAC.
|
|
hash_key = self.key.encode('ascii')
|
|
return hmac.new(hash_key, msg, digestmod="sha256").hexdigest()
|
|
|