parent
7d6427904f
commit
a8ea456b49
@ -0,0 +1,126 @@ |
||||
import base64 |
||||
import hmac |
||||
import io |
||||
import os |
||||
import pyotp |
||||
import qrcode |
||||
|
||||
from mailconfig import open_database |
||||
|
||||
def get_user_id(email, c): |
||||
c.execute('SELECT id FROM users WHERE email=?', (email,)) |
||||
r = c.fetchone() |
||||
if not r: raise ValueError("User does not exist.") |
||||
return r[0] |
||||
|
||||
def get_mfa_state(email, env): |
||||
c = open_database(env) |
||||
c.execute('SELECT id, type, secret, mru_token FROM mfa WHERE user_id=?', (get_user_id(email, c),)) |
||||
return [ |
||||
{ "id": r[0], "type": r[1], "secret": r[2], "mru_token": r[3] } |
||||
for r in c.fetchall() |
||||
] |
||||
|
||||
def enable_mfa(email, type, secret, token, env): |
||||
if type == "totp": |
||||
validate_totp_secret(secret) |
||||
# Sanity check with the provide current token. |
||||
totp = pyotp.TOTP(secret) |
||||
if not totp.verify(token, valid_window=1): |
||||
raise ValueError("Invalid token.") |
||||
else: |
||||
raise ValueError("Invalid MFA type.") |
||||
|
||||
conn, c = open_database(env, with_connection=True) |
||||
c.execute('INSERT INTO mfa (user_id, type, secret) VALUES (?, ?, ?)', (get_user_id(email, c), type, secret)) |
||||
conn.commit() |
||||
|
||||
def set_mru_token(email, token, env): |
||||
conn, c = open_database(env, with_connection=True) |
||||
c.execute('UPDATE mfa SET mru_token=? WHERE user_id=?', (token, get_user_id(email, c))) |
||||
conn.commit() |
||||
|
||||
def disable_mfa(email, mfa_id, env): |
||||
conn, c = open_database(env, with_connection=True) |
||||
if mfa_id is None: |
||||
# Disable all MFA for a user. |
||||
c.execute('DELETE FROM mfa WHERE user_id=?', (get_user_id(email, c),)) |
||||
else: |
||||
# Disable a particular MFA mode for a user. |
||||
c.execute('DELETE FROM mfa WHERE user_id=? AND id=?', (get_user_id(email, c), mfa_id)) |
||||
conn.commit() |
||||
|
||||
def validate_totp_secret(secret): |
||||
if type(secret) != str or secret.strip() == "": |
||||
raise ValueError("No secret provided.") |
||||
if len(secret) != 32: |
||||
raise ValueError("Secret should be a 32 characters base32 string") |
||||
|
||||
def provision_totp(email, env): |
||||
# Make a new secret. |
||||
secret = base64.b32encode(os.urandom(20)).decode('utf-8') |
||||
validate_totp_secret(secret) # sanity check |
||||
|
||||
# Make a URI that we encode within a QR code. |
||||
uri = pyotp.TOTP(secret).provisioning_uri( |
||||
name=email, |
||||
issuer_name=env["PRIMARY_HOSTNAME"] + " Mail-in-a-Box Control Panel" |
||||
) |
||||
|
||||
# Generate a QR code as a base64-encode PNG image. |
||||
qr = qrcode.make(uri) |
||||
byte_arr = io.BytesIO() |
||||
qr.save(byte_arr, format='PNG') |
||||
png_b64 = base64.b64encode(byte_arr.getvalue()).decode('utf-8') |
||||
|
||||
return { |
||||
"type": "totp", |
||||
"secret": secret, |
||||
"qr_code_base64": png_b64 |
||||
} |
||||
|
||||
def validate_auth_mfa(email, request, env): |
||||
# Validates that a login request satisfies any MFA modes |
||||
# that have been enabled for the user's account. Returns |
||||
# a tuple (status, [hints]). status is True for a successful |
||||
# MFA login, False for a missing token. If status is False, |
||||
# hints is an array of codes that indicate what the user |
||||
# can try. Possible codes are: |
||||
# "missing-totp-token" |
||||
# "invalid-totp-token" |
||||
|
||||
mfa_state = get_mfa_state(email, env) |
||||
|
||||
# If no MFA modes are added, return True. |
||||
if len(mfa_state) == 0: |
||||
return (True, []) |
||||
|
||||
# Try the enabled MFA modes. |
||||
hints = set() |
||||
for mfa_mode in mfa_state: |
||||
if mfa_mode["type"] == "totp": |
||||
# Check that a token is present in the X-Auth-Token header. |
||||
# If not, give a hint that one can be supplied. |
||||
token = request.headers.get('x-auth-token') |
||||
if not token: |
||||
hints.add("missing-totp-token") |
||||
continue |
||||
|
||||
# Check for a replay attack. |
||||
if hmac.compare_digest(token, mfa_mode['mru_token'] or ""): |
||||
# If the token fails, skip this MFA mode. |
||||
hints.add("invalid-totp-token") |
||||
continue |
||||
|
||||
# Check the token. |
||||
totp = pyotp.TOTP(mfa_mode["secret"]) |
||||
if not totp.verify(token, valid_window=1): |
||||
hints.add("invalid-totp-token") |
||||
continue |
||||
|
||||
# On success, record the token to prevent a replay attack. |
||||
set_mru_token(email, token, env) |
||||
return (True, []) |
||||
|
||||
# On a failed login, indicate failure and any hints for what the user can do instead. |
||||
return (False, list(hints)) |
@ -1,72 +0,0 @@ |
||||
import base64 |
||||
import hmac |
||||
import io |
||||
import os |
||||
import struct |
||||
import time |
||||
import pyotp |
||||
import qrcode |
||||
from mailconfig import get_mfa_state, set_mru_totp_code |
||||
|
||||
def get_secret(): |
||||
return base64.b32encode(os.urandom(20)).decode('utf-8') |
||||
|
||||
def get_otp_uri(secret, email): |
||||
return pyotp.TOTP(secret).provisioning_uri( |
||||
name=email, |
||||
issuer_name='mailinabox' |
||||
) |
||||
|
||||
def get_qr_code(data): |
||||
qr = qrcode.make(data) |
||||
byte_arr = io.BytesIO() |
||||
qr.save(byte_arr, format='PNG') |
||||
|
||||
encoded = base64.b64encode(byte_arr.getvalue()).decode('utf-8') |
||||
return 'data:image/png;base64,{}'.format(encoded) |
||||
|
||||
def validate(secret, token): |
||||
""" |
||||
@see https://tools.ietf.org/html/rfc6238#section-4 |
||||
@see https://tools.ietf.org/html/rfc4226#section-5.4 |
||||
""" |
||||
totp = pyotp.TOTP(secret) |
||||
return totp.verify(token, valid_window=1) |
||||
|
||||
class MissingTokenError(ValueError): |
||||
pass |
||||
|
||||
class BadTokenError(ValueError): |
||||
pass |
||||
|
||||
class TOTPStrategy(): |
||||
def __init__(self, email): |
||||
self.type = 'totp' |
||||
self.email = email |
||||
|
||||
def store_successful_login(self, token, env): |
||||
return set_mru_totp_code(self.email, token, env) |
||||
|
||||
def validate_request(self, request, env): |
||||
mfa_state = get_mfa_state(self.email, env) |
||||
|
||||
# 2FA is not enabled, we can skip further checks |
||||
if mfa_state['type'] != 'totp': |
||||
return True |
||||
|
||||
# If 2FA is enabled, raise if: |
||||
# 1. no token is provided via `x-auth-token` |
||||
# 2. a previously supplied token is used (to counter replay attacks) |
||||
# 3. the token is invalid |
||||
# in that case, we need to raise and indicate to the client to supply a TOTP |
||||
token_header = request.headers.get('x-auth-token') |
||||
|
||||
if not token_header: |
||||
raise MissingTokenError("Two factor code missing (no x-auth-token supplied)") |
||||
|
||||
# TODO: Should a token replay be handled as its own error? |
||||
if hmac.compare_digest(token_header, mfa_state['mru_token']) or validate(mfa_state['secret'], token_header) != True: |
||||
raise BadTokenError("Two factor code incorrect") |
||||
|
||||
self.store_successful_login(token_header, env) |
||||
return True |
Loading…
Reference in new issue