@ -1,5 +1,7 @@
import base64 , os , os . path , hmac , json , secrets
from datetime import timedelta
from expiringdict import ExpiringDict
import utils
from mailconfig import get_mail_password , get_mail_user_privileges
@ -9,16 +11,13 @@ DEFAULT_KEY_PATH = '/var/lib/mailinabox/api.key'
DEFAULT_AUTH_REALM = ' Mail-in-a-Box Management Server '
class AuthService :
""" Generate an API key for authenticating clients
Clients must read the key from the key file and send the key with all HTTP
requests . The key is passed as the username field in the standard HTTP
Basic Auth header .
"""
def __init__ ( self ) :
self . auth_realm = DEFAULT_AUTH_REALM
self . key_path = DEFAULT_KEY_PATH
self . max_session_duration = timedelta ( days = 2 )
self . init_system_api_key ( )
self . sessions = ExpiringDict ( max_len = 64 , max_age_seconds = self . max_session_duration . total_seconds ( ) )
def init_system_api_key ( self ) :
""" Write an API key to a local file so local processes can use the API """
@ -31,123 +30,133 @@ class AuthService:
finally :
os . umask ( old_umask )
self . key = secrets . token_hex ( 24 )
self . key = secrets . token_hex ( 3 2)
os . makedirs ( os . path . dirname ( self . key_path ) , exist_ok = True )
with create_file_with_mode ( self . key_path , 0o640 ) as key_file :
key_file . write ( self . key + ' \n ' )
def authenticate ( self , request , env ) :
""" Test if the client key passed in HTTP Authorization header matches the service key
or if the or username / password passed in the header matches an administrator user .
def authenticate ( self , request , env , login_only = False , logout = False ) :
""" Test if the HTTP Authorization header ' s username matches the system key, a session key,
or if the username / password passed in the header matches a local user .
Returns a tuple of the user ' s email address and list of user privileges (e.g.
( ' my@email ' , [ ] ) or ( ' my@email ' , [ ' admin ' ] ) ; raises a ValueError on login failure .
If the user used an API key , the user ' s email is returned as None. " " "
def decode ( s ) :
return base64 . b64decode ( s . encode ( ' ascii ' ) ) . decode ( ' ascii ' )
If the user used the system API key , the user ' s email is returned as None since
this key is not associated with a user . """
def parse_basic_auth ( header ) :
def parse_http_authorization_basic ( header ) :
def decode ( s ) :
return base64 . b64decode ( s . encode ( ' ascii ' ) ) . decode ( ' ascii ' )
if " " not in header :
return None , None
scheme , credentials = header . split ( maxsplit = 1 )
if scheme != ' Basic ' :
return None , None
credentials = decode ( credentials )
if " : " not in credentials :
return None , None
username , password = credentials . split ( ' : ' , maxsplit = 1 )
return username , password
header = request . headers . get ( ' Authorization ' )
if not header :
raise ValueError ( " No authorization header provided. " )
username , password = parse_basic_auth ( header )
username , password = parse_http_authorization_basic ( request . headers . get ( ' Authorization ' , ' ' ) )
if username in ( None , " " ) :
raise ValueError ( " Authorization header invalid. " )
if username == self . key :
# The user passed the system API key which grants administrative privs.
if username . strip ( ) == " " and password . strip ( ) == " " :
raise ValueError ( " No email address, password, session key, or API key provided. " )
# If user passed the system API key, grant administrative privs. This key
# is not associated with a user.
if username == self . key and not login_only :
return ( None , [ " admin " ] )
else :
# The user is trying to log in with a username and either a password
# (and possibly a MFA token) or a user-specific API key.
return ( username , self . check_user_auth ( username , password , request , env ) )
def check_user_auth ( self , email , pw , request , env ) :
# Validate a user's login email address and password. If MFA is enabled,
# check the MFA token in the X-Auth-Token header.
#
# On success returns a list of privileges (e.g. [] or ['admin']). On login
# failure, raises a ValueError with a login error message.
# Sanity check.
if email == " " or pw == " " :
raise ValueError ( " Enter an email address and password. " )
# The password might be a user-specific API key. create_user_key raises
# a ValueError if the user does not exist.
if hmac . compare_digest ( self . create_user_key ( email , env ) , pw ) :
# OK.
pass
else :
# Get the hashed password of the user. Raise a ValueError if the
# email address does not correspond to a user.
pw_hash = get_mail_password ( email , env )
# If the password corresponds with a session token for the user, grant access for that user.
if password in self . sessions and self . sessions [ password ] [ " email " ] == username and not login_only :
sessionid = password
session = self . sessions [ sessionid ]
if session [ " password_token " ] != self . create_user_password_state_token ( username , env ) :
# This session is invalid because the user's password/MFA state changed
# after the session was created.
del self . sessions [ sessionid ]
raise ValueError ( " Session expired. " )
if logout :
# Clear the session.
del self . sessions [ sessionid ]
else :
# Re-up the session so that it does not expire.
self . sessions [ sessionid ] = session
# If no password was given, but a username was given, we're missing some information.
elif password . strip ( ) == " " :
raise ValueError ( " Enter a password. " )
# Authenticate.
try :
# Use 'doveadm pw' to check credentials. doveadm will return
# a non-zero exit status if the credentials are no good,
# and check_call will raise an exception in that case.
utils . shell ( ' check_call ' , [
" /usr/bin/doveadm " , " pw " ,
" -p " , pw ,
" -t " , pw_hash ,
] )
except :
# Login failed.
raise ValueError ( " Invalid password. " )
# If MFA is enabled, check that MFA passes.
status , hints = validate_auth_mfa ( email , request , env )
if not status :
# Login valid. Hints may have more info.
raise ValueError ( " , " . join ( hints ) )
else :
# The user is trying to log in with a username and a password
# (and possibly a MFA token). On failure, an exception is raised.
self . check_user_auth ( username , password , request , env )
# Get privileges for authorization. This call should never fail because by this
# point we know the email address is a valid user. But on error the call will
# return a tuple of an error message and an HTTP status code.
privs = get_mail_user_privileges ( email , env )
# point we know the email address is a valid user --- unless the user has been
# deleted after the session was granted. On error the call will return a tuple
# of an error message and an HTTP status code.
privs = get_mail_user_privileges ( username , env )
if isinstance ( privs , tuple ) : raise ValueError ( privs [ 0 ] )
# Return a list of privileges .
return privs
# Return the authorization information.
return ( username , privs )
def create_user_key ( self , email , env ) :
# Create a user API key, which is a shared secret that we can re-generate from
# static information in our database. The shared secret contains the user's
# email address, current hashed password, and current MFA state, so that the
# key becomes invalid if any of that information changes.
#
# Use an HMAC to generate the API key using our system API key as a key,
# which also means that the API key becomes invalid when our system API key
# changes --- i.e. when this process is restarted.
def check_user_auth ( self , email , pw , request , env ) :
# Validate a user's login email address and password. If MFA is enabled,
# check the MFA token in the X-Auth-Token header.
#
# Raises ValueError via get_mail_password if the user doesn't exist.
# On login failure, raises a ValueError with a login error message. On
# success, nothing is returned.
# Authenticate.
try :
# Get the hashed password of the user. Raise a ValueError if the
# email address does not correspond to a user. But wrap it in the
# same exception as if a password fails so we don't easily reveal
# if an email address is valid.
pw_hash = get_mail_password ( email , env )
# Construct the HMAC message from the user's email address and current password.
msg = b " AUTH: " + email . encode ( " utf8 " ) + b " " + get_mail_password ( email , env ) . encode ( " utf8 " )
# Use 'doveadm pw' to check credentials. doveadm will return
# a non-zero exit status if the credentials are no good,
# and check_call will raise an exception in that case.
utils . shell ( ' check_call ' , [
" /usr/bin/doveadm " , " pw " ,
" -p " , pw ,
" -t " , pw_hash ,
] )
except :
# Login failed.
raise ValueError ( " Incorrect email address or password. " )
# If MFA is enabled, check that MFA passes.
status , hints = validate_auth_mfa ( email , request , env )
if not status :
# Login valid. Hints may have more info.
raise ValueError ( " , " . join ( hints ) )
def create_user_password_state_token ( self , email , env ) :
# Create a token that changes if the user's password or MFA options change
# so that sessions become invalid if any of that information changes.
msg = get_mail_password ( email , env ) . encode ( " utf8 " )
# Add to the message the current MFA state, which is a list of MFA information.
# Turn it into a string stably.
msg + = b " " + json . dumps ( get_hash_mfa_state ( email , env ) , sort_keys = True ) . encode ( " utf8 " )
# Make the HMAC.
# Make a HMAC using the system API key as a hash key .
hash_key = self . key . encode ( ' ascii ' )
return hmac . new ( hash_key , msg , digestmod = " sha256 " ) . hexdigest ( )
def create_session_key ( self , username , env , type = None ) :
# Create a new session.
token = secrets . token_hex ( 32 )
self . sessions [ token ] = {
" email " : username ,
" password_token " : self . create_user_password_state_token ( username , env ) ,
}
return token