diff --git a/conf/mfa-totp.schema b/conf/mfa-totp.schema index 0bd09266..73b4ac6c 100644 --- a/conf/mfa-totp.schema +++ b/conf/mfa-totp.schema @@ -1,5 +1,5 @@ # -# MiaB-LDAP's directory schema for Time-based one time passwords (TOTP) +# MiaB-LDAP's directory schema for time-based one time passwords (TOTP) # # MiaB LDAP UUID(v4): 7392cdda-5ec8-431f-9936-0000273c0167 # or: 1939000794.24264.17183.39222.658243943 @@ -8,28 +8,48 @@ objectIdentifier MiabLDAProot 2.25.1939000794.24264.17183.39222.658243943 objectIdentifier MiabLDAPmfa MiabLDAProot:1 -objectIdentifier MiabLDAPmfaAttributeType MiabLDAPmfa:3 -objectIdentifier MiabLDAPmfaObjectClass MiabLDAPmfa:4 +objectIdentifier MiabLDAPmfaAttributeType MiabLDAPmfa:2 +objectIdentifier MiabLDAPmfaObjectClass MiabLDAPmfa:3 # secret consists of base32 characters (see RFC 4648) + attributetype ( MiabLDAPmfaAttributeType:1 DESC 'TOTP secret' NAME 'totpSecret' SYNTAX 1.3.6.1.4.1.1466.115.121.1.26 + X-ORDERED 'VALUES' EQUALITY caseExactIA5Match ) + # tokens are a base-10 string of N digits, but set the syntax to # IA5String anyway + attributetype ( MiabLDAPmfaAttributeType:2 DESC 'TOTP last token used' NAME 'totpMruToken' SYNTAX 1.3.6.1.4.1.1466.115.121.1.26 + X-ORDERED 'VALUES' EQUALITY caseExactIA5Match ) -objectClass ( MiabLDAPmfaObjectClass:3 + +# The label is currently any text supplied by the user, which is used +# as a reminder of where the secret is stored when logging in (where +# the authenticator app is, that holds the secret). eg "my samsung +# phone" + +attributetype ( MiabLDAPmfaAttributeType:3 + DESC 'TOTP device label' + NAME 'totpLabel' + SYNTAX 1.3.6.1.4.1.1466.115.121.1.26 + X-ORDERED 'VALUES' + EQUALITY caseIgnoreIA5Match ) + + +# The TOTP objectClass + +objectClass ( MiabLDAPmfaObjectClass:1 NAME 'totpUser' - DESC 'MiaB-LDAP User TOTP settings' + DESC 'MiaB-LDAP TOTP settings for a user' SUP top AUXILIARY - MUST ( totpSecret ) - MAY ( totpMruToken ) ) + MUST ( totpSecret $ totpMruToken $ totpLabel ) ) diff --git a/management/auth.py b/management/auth.py index bee129d6..cf2a4bdc 100644 --- a/management/auth.py +++ b/management/auth.py @@ -1,9 +1,10 @@ -import base64, os, os.path, hmac +import base64, os, os.path, hmac, json from flask import make_response -import utils, totp -from mailconfig import validate_login, get_mail_password, get_mail_user_privileges, get_mfa_state +import utils +from mailconfig import validate_login, get_mail_password, get_mail_user_privileges +from mfa import get_mfa_state, validate_auth_mfa DEFAULT_KEY_PATH = '/var/lib/mailinabox/api.key' DEFAULT_AUTH_REALM = 'Mail-in-a-Box Management Server' @@ -72,40 +73,29 @@ class KeyAuthService: if username in (None, ""): raise ValueError("Authorization header invalid.") elif username == self.key: - # The user passed the API key which grants administrative privs. + # The user passed the master API key which grants administrative privs. return (None, ["admin"]) else: - # The user is trying to log in with a username and user-specific - # API key or password. Raises or returns privs and an indicator - # whether the user is using their password or a user-specific API-key. - privs, is_user_key = self.get_user_credentials(username, password, env) + # The user is trying to log in with a username and either a password + # (and possibly a MFA token) or a user-specific API key. + return (username, self.check_user_auth(username, password, request, env)) - # If the user is using their API key to login, 2FA has been passed before - if is_user_key: - return (username, privs) - - totp_strategy = totp.TOTPStrategy(email=username) - # this will raise `totp.MissingTokenError` or `totp.BadTokenError` for bad requests - totp_strategy.validate_request(request, env) - - return (username, privs) - - def get_user_credentials(self, email, pw, env): - # Validate a user's credentials. On success returns a list of - # privileges (e.g. [] or ['admin']). On failure raises a ValueError - # with a login error message. + def check_user_auth(self, email, pw, request, env): + # Validate a user's login email address and password. If MFA is enabled, + # check the MFA token in the X-Auth-Token header. + # + # On success returns a list of privileges (e.g. [] or ['admin']). On login + # failure, raises a ValueError with a login error message. # Sanity check. if email == "" or pw == "": raise ValueError("Enter an email address and password.") - is_user_key = False - # The password might be a user-specific API key. create_user_key raises # a ValueError if the user does not exist. if hmac.compare_digest(self.create_user_key(email, env), pw): # OK. - is_user_key = True + pass else: # Get the hashed password of the user. Raise a ValueError if the # email address does not correspond to a user. @@ -113,6 +103,12 @@ class KeyAuthService: # Login failed. raise ValueError("Invalid password.") + # If MFA is enabled, check that MFA passes. + status, hints = validate_auth_mfa(email, request, env) + if not status: + # Login valid. Hints may have more info. + raise ValueError(",".join(hints)) + # Get privileges for authorization. This call should never fail because by this # point we know the email address is a valid user. But on error the call will # return a tuple of an error message and an HTTP status code. @@ -120,25 +116,29 @@ class KeyAuthService: if isinstance(privs, tuple): raise ValueError(privs[0]) # Return a list of privileges. - return (privs, is_user_key) + return privs def create_user_key(self, email, env): - # Store an HMAC with the client. The hashed message of the HMAC will be the user's - # email address & hashed password and the key will be the master API key. If TOTP - # is active, the key will also include the TOTP secret. The user of course has their - # own email address and password. We assume they do not have the master API key - # (unless they are trusted anyway). The HMAC proves that they authenticated with us - # in some other way to get the HMAC. Including the password means that when - # a user's password is reset, the HMAC changes and they will correctly need to log - # in to the control panel again. This method raises a ValueError if the user does - # not exist, due to get_mail_password. + # Create a user API key, which is a shared secret that we can re-generate from + # static information in our database. The shared secret contains the user's + # email address, current hashed password, and current MFA state, so that the + # key becomes invalid if any of that information changes. + # + # Use an HMAC to generate the API key using our master API key as a key, + # which also means that the API key becomes invalid when our master API key + # changes --- i.e. when this process is restarted. + # + # Raises ValueError via get_mail_password if the user doesn't exist. + + # Construct the HMAC message from the user's email address and current password. msg = b"AUTH:" + email.encode("utf8") + b" " + ";".join(get_mail_password(email, env)).encode("utf8") - mfa_state = get_mfa_state(email, env) + + # Add to the message the current MFA state, which is a list of MFA information. + # Turn it into a string stably. + msg += b" " + json.dumps(get_mfa_state(email, env), sort_keys=True).encode("utf8") + + # Make the HMAC. hash_key = self.key.encode('ascii') - - if mfa_state['type'] == 'totp': - hash_key = hash_key + mfa_state['secret'].encode('ascii') - return hmac.new(hash_key, msg, digestmod="sha256").hexdigest() def _generate_key(self): diff --git a/management/daemon.py b/management/daemon.py index 1ef9ee70..8b9bb277 100755 --- a/management/daemon.py +++ b/management/daemon.py @@ -5,11 +5,12 @@ from functools import wraps from flask import Flask, request, render_template, abort, Response, send_from_directory, make_response -import auth, utils, totp +import auth, utils, mfa from mailconfig import get_mail_users, get_mail_users_ex, get_admins, add_mail_user, set_mail_password, set_mail_display_name, remove_mail_user from mailconfig import get_mail_user_privileges, add_remove_mail_user_privilege from mailconfig import get_mail_aliases, get_mail_aliases_ex, get_mail_domains, add_mail_alias, remove_mail_alias -from mailconfig import get_mfa_state, create_totp_credential, delete_totp_credential +from mfa import get_mfa_state, enable_mfa, disable_mfa +import mfa_totp env = utils.load_environment() @@ -36,30 +37,31 @@ app = Flask(__name__, template_folder=os.path.abspath(os.path.join(os.path.dirna def authorized_personnel_only(viewfunc): @wraps(viewfunc) def newview(*args, **kwargs): - # Authenticate the passed credentials, which is either the API key or a username:password pair. + # Authenticate the passed credentials, which is either the API key or a username:password pair + # and an optional X-Auth-Token token. error = None privs = [] try: email, privs = auth_service.authenticate(request, env) - - except totp.MissingTokenError as e: - error = str(e) - except totp.BadTokenError as e: - # Write a line in the log recording the failed login - log_failed_login(request) - error = str(e) except ValueError as e: # Write a line in the log recording the failed login log_failed_login(request) + # Authentication failed. - error = "Incorrect username or password" + error = str(e) # Authorized to access an API view? if "admin" in privs: + # Store the email address of the logged in user so it can be accessed + # from the API methods that affect the calling user. + request.user_email = email + request.user_privs = privs + # Call view func. return viewfunc(*args, **kwargs) - elif not error: + + if not error: error = "You are not an administrator." # Not authorized. Return a 401 (send auth) and a prompt to authorize by default. @@ -126,28 +128,19 @@ def me(): # Is the caller authorized? try: email, privs = auth_service.authenticate(request, env) - except totp.MissingTokenError as e: - return json_response({ - "status": "missing_token", - "reason": str(e), - }) - except totp.BadTokenError as e: - # Log the failed login - log_failed_login(request) - - return json_response({ - "status": "bad_token", - "reason": str(e), - }) - except ValueError as e: - # Log the failed login - log_failed_login(request) - - return json_response({ - "status": "invalid", - "reason": "Incorrect username or password", - }) + if "missing-totp-token" in str(e): + return json_response({ + "status": "missing-totp-token", + "reason": str(e), + }) + else: + # Log the failed login + log_failed_login(request) + return json_response({ + "status": "invalid", + "reason": str(e), + }) resp = { "status": "ok", @@ -418,47 +411,34 @@ def ssl_provision_certs(): @app.route('/mfa/status', methods=['GET']) @authorized_personnel_only -def two_factor_auth_get_status(): - email, _ = auth_service.authenticate(request, env) - - mfa_state = get_mfa_state(email, env) - - if mfa_state['type'] == 'totp': - return json_response({ "type": 'totp' }) - - secret = totp.get_secret() - secret_url = totp.get_otp_uri(secret, email) - secret_qr = totp.get_qr_code(secret_url) - +def mfa_get_status(): return json_response({ - "type": None, - "totp_secret": secret, - "totp_qr": secret_qr + "enabled_mfa": get_mfa_state(request.user_email, env), + "new_mfa": { + "totp": mfa_totp.provision(request.user_email, env) + } }) @app.route('/mfa/totp/enable', methods=['POST']) @authorized_personnel_only def totp_post_enable(): - email, _ = auth_service.authenticate(request, env) - secret = request.form.get('secret') token = request.form.get('token') - - if type(secret) != str or type(token) != str or len(token) != 6 or len(secret) != 32: + label = request.form.get('label') + if type(token) != str: return json_response({ "error": 'bad_input' }, 400) + try: + mfa_totp.validate_secret(secret) + enable_mfa(request.user_email, "totp", secret, token, label, env) + except ValueError as e: + return str(e) + return "OK" - if totp.validate(secret, token): - create_totp_credential(email, secret, env) - return json_response({}) - - return json_response({ "error": 'token_mismatch' }, 400) - -@app.route('/mfa/totp/disable', methods=['POST']) +@app.route('/mfa/disable', methods=['POST']) @authorized_personnel_only def totp_post_disable(): - email, _ = auth_service.authenticate(request, env) - delete_totp_credential(email, env) - return json_response({}) + disable_mfa(request.user_email, request.form.get('mfa-id'), env) + return "OK" # WEB diff --git a/management/mailconfig.py b/management/mailconfig.py index e3b1f2f8..df32acac 100755 --- a/management/mailconfig.py +++ b/management/mailconfig.py @@ -1129,75 +1129,6 @@ def get_required_aliases(env): return aliases -# multi-factor auth - -def get_mfa_state(email, env): - # find the user - conn = open_database(env) - user = find_mail_user(env, email, ['objectClass','totpSecret','totpMruToken'], conn) - if user is None or 'totpUser' not in user['objectClass']: - return { 'type': None } - - secret = user['totpSecret'][0] - mru_token = None - if len(user['totpMruToken'])>0: - mru_token = user['totpMruToken'][0] - - return { - 'type': 'totp', - 'secret': secret, - 'mru_token': '' if mru_token is None else mru_token - } - -def create_totp_credential(email, secret, env): - validate_totp_secret(secret) - conn = open_database(env) - user = find_mail_user(env, email, ['objectClass','totpSecret'], conn) - if user is None: - return ("That's not a user (%s)." % email, 400) - - attrs = { - "totpSecret": secret, - } - if 'totpUser' not in user['objectClass']: - attrs['objectClass'] = user['objectClass'].copy() - attrs['objectClass'].append('totpUser') - conn.add_or_modify(user['dn'], user, attrs.keys(), None, attrs) - return "OK" - -def set_mru_totp_code(email, token, env): - conn = open_database(env) - user = find_mail_user(env, email, ['objectClass','totpMruToken'], conn) - if user is None: - return ("That's not a user (%s)." % email, 400) - - if 'totpUser' not in user['objectClass']: - return ("User (%s) not configured for TOTP" % email, 400) - - attrs = { - "totpMruToken": token - } - conn.add_or_modify(user['dn'], user, attrs.keys(), None, attrs) - return "OK" - -def delete_totp_credential(email, env): - conn = open_database(env) - user = find_mail_user(env, email, ['objectClass','totpSecret','totpMruToken'], conn) - if user is None: - return ("That's not a user (%s)." % email, 400) - - if 'totpUser' not in user['objectClass']: - return "OK" - - attrs = { - "totpMruToken": None, - "totpSecret": None, - "objectClass": user["objectClass"].copy() - } - attrs["objectClass"].remove("totpUser") - conn.add_or_modify(user['dn'], user, attrs.keys(), None, attrs) - return "OK" - def kick(env, mail_result=None): results = [] @@ -1259,12 +1190,6 @@ def validate_password(pw): if len(pw) < 8: raise ValueError("Passwords must be at least eight characters.") -def validate_totp_secret(secret): - if type(secret) != str or secret.strip() == "": - raise ValueError("No secret provided.") - if len(secret) != 32: - raise ValueError("Secret should be a 32 characters base32 string") - if __name__ == "__main__": import sys if len(sys.argv) > 2 and sys.argv[1] == "validate-email": diff --git a/management/mfa.py b/management/mfa.py new file mode 100644 index 00000000..5959ee98 --- /dev/null +++ b/management/mfa.py @@ -0,0 +1,113 @@ +# -*- indent-tabs-mode: t; tab-width: 4; python-indent-offset: 4; -*- + +from mailconfig import open_database, find_mail_user +import mfa_totp + +def strip_order_prefix(rec, attributes): + '''strip the order prefix from X-ORDERED ldap values for the + list of attributes specified + + `rec` is modified in-place + + the server returns X-ORDERED values in-order so the values will be + correctly orded in the record. + + For example, the function will change: + totpSecret: {0}secret1 + totpSecret: {1}secret2 + to: + totpSecret: secret1 + totpSecret: secret2 + + TODO: move to backend.py and/or integrate with LdapConnection.search() + ''' + for attr in attributes: + # ignore attribute that doesn't exist + if not attr in rec: continue + # ..as well as None values and empty list + if not rec[attr]: continue + + newvals = [] + for val in rec[attr]: + i = val.find('}') + if i>=0: newvals.append(val[i+1:]) + rec[attr] = newvals + +def get_mfa_user(email, env, conn=None): + '''get the ldap record for the user + + ''' + user = find_mail_user(env, email, ['objectClass','totpSecret','totpMruToken','totpLabel'], conn) + if not user: + raise ValueError("User does not exist.") + strip_order_prefix(user, ['totpSecret','totpMruToken','totpLabel']) + return user + + + +def get_mfa_state(email, env): + '''return details about what MFA schemes are enabled for a user + ordered by the priority that the scheme will be tried, with index + zero being the first. + + ''' + user = get_mfa_user(email, env) + state_list = [] + state_list += mfa_totp.get_state(user) + return state_list + +def enable_mfa(email, type, secret, token, label, env): + '''enable MFA using the scheme specified in `type`. users may have +multiple mfa schemes enabled of the same type. + + ''' + user = get_mfa_user(email, env) + if type == "totp": + mfa_totp.enable(user, secret, token, label, env) + else: + raise ValueError("Invalid MFA type.") + +def disable_mfa(email, mfa_id, env): + '''disable a specific MFA scheme. `mfa_id` identifies the specific + entry and is available in the `id` field of an item in the list + obtained from get_mfa_state() + + ''' + user = get_mfa_user(email, env) + if mfa_id is None: + # Disable all MFA for a user. + mfa_totp.disable(user, None, env) + + elif mfa_id.startswith("totp:"): + # Disable a particular MFA mode for a user. + mfa_totp.disable(user, mfa_id, env) + +def validate_auth_mfa(email, request, env): + # Validates that a login request satisfies any MFA modes + # that have been enabled for the user's account. Returns + # a tuple (status, [hints]). status is True for a successful + # MFA login, False for a missing token. If status is False, + # hints is an array of codes that indicate what the user + # can try. Possible codes are: + # "missing-totp-token" + # "invalid-totp-token" + + mfa_state = get_mfa_state(email, env) + + # If no MFA modes are added, return True. + if len(mfa_state) == 0: + return (True, []) + + # Try the enabled MFA modes. + hints = set() + for mfa_mode in mfa_state: + if mfa_mode["type"] == "totp": + user = get_mfa_user(email, env) + result, hint = mfa_totp.validate(user, mfa_mode, request, True, env) + if not result: + hints.add(hint) + else: + return (True, []) + + # On a failed login, indicate failure and any hints for what the user can do instead. + return (False, list(hints)) diff --git a/management/mfa_totp.py b/management/mfa_totp.py new file mode 100644 index 00000000..c48196db --- /dev/null +++ b/management/mfa_totp.py @@ -0,0 +1,165 @@ +# -*- indent-tabs-mode: t; tab-width: 4; python-indent-offset: 4; -*- +import hashlib +import base64 +import hmac +import pyotp +import qrcode +import io +import os + +from mailconfig import open_database + +def totp_id_from_index(user, index): + '''return the sha-256 hash of the corresponding totpSecret as the + unique id for the totp entry. use the hash and not the index + itself to ensure a change in the totp order does not cause an + unexpected change + + ''' + m = hashlib.sha256() + m.update(user['totpSecret'][index].encode("utf8")) + return 'totp:' + m.hexdigest() + +def totp_index_from_id(user, id): + '''return the index of the corresponding id from the list of totp + entries for a user, or -1 if not found + + ''' + for index in range(0, len(user['totpSecret'])): + xid = totp_id_from_index(user, index) + if xid == id: + return index + return -1 + +def get_state(user): + state_list = [] + + # totp + for idx in range(0, len(user['totpSecret'])): + state_list.append({ + 'id': totp_id_from_index(user, idx), + 'type': 'totp', + 'secret': user['totpSecret'][idx], + 'mru_token': user['totpMruToken'][idx], + 'label': user['totpLabel'][idx] + }) + return state_list + +def enable(user, secret, token, label, env): + validate_secret(secret) + # Sanity check with the provide current token. + totp = pyotp.TOTP(secret) + if not totp.verify(token, valid_window=1): + raise ValueError("Invalid token.") + + mods = { + "totpSecret": user['totpSecret'].copy() + [secret], + "totpMruToken": user['totpMruToken'].copy() + [''], + "totpLabel": user['totpLabel'].copy() + [label or ''] + } + if 'totpUser' not in user['objectClass']: + mods['objectClass'] = user['objectClass'].copy() + ['totpUser'] + + conn = open_database(env) + conn.modify_record(user, mods) + +def set_mru_token(user, id, token, env): + # return quietly if the user is not configured for TOTP + if 'totpUser' not in user['objectClass']: return + + # ensure the id is valid + idx = totp_index_from_id(user, id) + if idx<0: + raise ValueError('MFA/totp mru index is out of range') + + # store the token + mods = { "totpMruToken": user['totpMruToken'].copy() } + mods['totpMruToken'][idx] = token + conn = open_database(env) + conn.modify_record(user, mods) + + +def disable(user, id, env): + # Disable a particular MFA mode for a user. + if id is None: + # Disable all totp + mods = { + "objectClass": user["objectClass"].copy(), + "totpMruToken": None, + "totpSecret": None, + "totpLabel": None + } + mods["objectClass"].remove("totpUser") + open_database(env).modify_record(user, mods) + + else: + # Disable totp at index specified + idx = totp_index_from_id(user, id) + if idx<0 or idx>=len(user['totpSecret']): + raise ValueError('MFA/totp mru index is out of range') + mods = { + "objectClass": user["objectClass"].copy(), + "totpMruToken": user["totpMruToken"].copy(), + "totpSecret": user["totpSecret"].copy(), + "totpLabel": user["totpLabel"].copy() + } + mods["totpMruToken"].pop(idx) + mods["totpSecret"].pop(idx) + mods["totpLabel"].pop(idx) + if len(mods["totpSecret"])==0: + mods['objectClass'].remove('totpUser') + open_database(env).modify_record(user, mods) + + +def validate_secret(secret): + if type(secret) != str or secret.strip() == "": + raise ValueError("No secret provided.") + if len(secret) != 32: + raise ValueError("Secret should be a 32 characters base32 string") + +def provision(email, env): + # Make a new secret. + secret = base64.b32encode(os.urandom(20)).decode('utf-8') + validate_secret(secret) # sanity check + + # Make a URI that we encode within a QR code. + uri = pyotp.TOTP(secret).provisioning_uri( + name=email, + issuer_name=env["PRIMARY_HOSTNAME"] + " Mail-in-a-Box Control Panel" + ) + + # Generate a QR code as a base64-encode PNG image. + qr = qrcode.make(uri) + byte_arr = io.BytesIO() + qr.save(byte_arr, format='PNG') + png_b64 = base64.b64encode(byte_arr.getvalue()).decode('utf-8') + + return { + "type": "totp", + "secret": secret, + "qr_code_base64": png_b64 + } + + +def validate(user, state, request, save_mru, env): + # Check that a token is present in the X-Auth-Token header. + # If not, give a hint that one can be supplied. + token = request.headers.get('x-auth-token') + if not token: + return (False, "missing-totp-token") + + # Check for a replay attack. + if hmac.compare_digest(token, state['mru_token'] or ""): + # If the token fails, skip this MFA mode. + return (False, "invalid-totp-token") + + # Check the token. + totp = pyotp.TOTP(state["secret"]) + if not totp.verify(token, valid_window=1): + return (False, "invalid-totp-token") + + # On success, record the token to prevent a replay attack. + if save_mru: + set_mru_token(user, state['id'], token, env) + + return (True, None) diff --git a/management/templates/index.html b/management/templates/index.html index 8fdb7c22..12f6ad8e 100644 --- a/management/templates/index.html +++ b/management/templates/index.html @@ -93,16 +93,18 @@
When two-factor authentication is enabled, you will be prompted to enter a six digit code from an +authenticator app (usually on your phone) when you log into this control panel.
+ +