Extract TOTPStrategy class to totp.py
* this decouples `TOTP` validation and storage logic from `auth` and moves it to `totp` * reduce `pyotp.validate#valid_window` from `2` to `1`
This commit is contained in:
parent
6594e19a1f
commit
ce70f44c58
|
@ -4,17 +4,10 @@ from flask import make_response
|
||||||
|
|
||||||
import utils, totp
|
import utils, totp
|
||||||
from mailconfig import get_mail_password, get_mail_user_privileges
|
from mailconfig import get_mail_password, get_mail_user_privileges
|
||||||
from mailconfig import get_two_factor_info, set_two_factor_last_used_token
|
|
||||||
|
|
||||||
DEFAULT_KEY_PATH = '/var/lib/mailinabox/api.key'
|
DEFAULT_KEY_PATH = '/var/lib/mailinabox/api.key'
|
||||||
DEFAULT_AUTH_REALM = 'Mail-in-a-Box Management Server'
|
DEFAULT_AUTH_REALM = 'Mail-in-a-Box Management Server'
|
||||||
|
|
||||||
class MissingTokenError(ValueError):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class BadTokenError(ValueError):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class KeyAuthService:
|
class KeyAuthService:
|
||||||
"""Generate an API key for authenticating clients
|
"""Generate an API key for authenticating clients
|
||||||
|
|
||||||
|
@ -91,26 +84,10 @@ class KeyAuthService:
|
||||||
if is_user_key:
|
if is_user_key:
|
||||||
return (username, privs)
|
return (username, privs)
|
||||||
|
|
||||||
secret, last_token = get_two_factor_info(username, env)
|
totp_strategy = totp.TOTPStrategy(email=username)
|
||||||
|
# this will raise `totp.MissingTokenError` or `totp.BadTokenError` for bad requests
|
||||||
|
totp_strategy.validate_request(request, env)
|
||||||
|
|
||||||
# 2FA is not enabled, we can skip further checks
|
|
||||||
if secret == "" or secret == None:
|
|
||||||
return (username, privs)
|
|
||||||
|
|
||||||
# If 2FA is enabled, raise if:
|
|
||||||
# 1. no token is provided via `x-auth-token`
|
|
||||||
# 2. a previously supplied token is used (to counter replay attacks)
|
|
||||||
# 3. the token is invalid
|
|
||||||
# in that case, we need to raise and indicate to the client to supply a TOTP
|
|
||||||
token_header = request.headers.get('x-auth-token')
|
|
||||||
if token_header == None or token_header == "":
|
|
||||||
raise MissingTokenError("Two factor code missing (no x-auth-token supplied)")
|
|
||||||
|
|
||||||
# TODO: Should a token replay be handled as its own error?
|
|
||||||
if token_header == last_token or totp.validate(secret, token_header) != True:
|
|
||||||
raise BadTokenError("Two factor code incorrect")
|
|
||||||
|
|
||||||
set_two_factor_last_used_token(username, token_header, env)
|
|
||||||
return (username, privs)
|
return (username, privs)
|
||||||
|
|
||||||
def get_user_credentials(self, email, pw, env):
|
def get_user_credentials(self, email, pw, env):
|
||||||
|
|
|
@ -40,10 +40,10 @@ def authorized_personnel_only(viewfunc):
|
||||||
error = None
|
error = None
|
||||||
try:
|
try:
|
||||||
email, privs = auth_service.authenticate(request, env)
|
email, privs = auth_service.authenticate(request, env)
|
||||||
except auth.MissingTokenError as e:
|
except totp.MissingTokenError as e:
|
||||||
privs = []
|
privs = []
|
||||||
error = str(e)
|
error = str(e)
|
||||||
except auth.BadTokenError as e:
|
except totp.BadTokenError as e:
|
||||||
# Write a line in the log recording the failed login
|
# Write a line in the log recording the failed login
|
||||||
log_failed_login(request)
|
log_failed_login(request)
|
||||||
|
|
||||||
|
@ -128,7 +128,7 @@ def me():
|
||||||
# Is the caller authorized?
|
# Is the caller authorized?
|
||||||
try:
|
try:
|
||||||
email, privs = auth_service.authenticate(request, env)
|
email, privs = auth_service.authenticate(request, env)
|
||||||
except auth.MissingTokenError as e:
|
except totp.MissingTokenError as e:
|
||||||
# Log the failed login
|
# Log the failed login
|
||||||
log_failed_login(request)
|
log_failed_login(request)
|
||||||
|
|
||||||
|
@ -136,7 +136,7 @@ def me():
|
||||||
"status": "missing_token",
|
"status": "missing_token",
|
||||||
"reason": str(e),
|
"reason": str(e),
|
||||||
})
|
})
|
||||||
except auth.BadTokenError as e:
|
except totp.BadTokenError as e:
|
||||||
# Log the failed login
|
# Log the failed login
|
||||||
log_failed_login(request)
|
log_failed_login(request)
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ import struct
|
||||||
import time
|
import time
|
||||||
import pyotp
|
import pyotp
|
||||||
import qrcode
|
import qrcode
|
||||||
|
from mailconfig import get_two_factor_info, set_two_factor_last_used_token
|
||||||
|
|
||||||
def get_secret():
|
def get_secret():
|
||||||
return base64.b32encode(os.urandom(20)).decode('utf-8')
|
return base64.b32encode(os.urandom(20)).decode('utf-8')
|
||||||
|
@ -30,4 +31,42 @@ def validate(secret, token):
|
||||||
@see https://tools.ietf.org/html/rfc4226#section-5.4
|
@see https://tools.ietf.org/html/rfc4226#section-5.4
|
||||||
"""
|
"""
|
||||||
totp = pyotp.TOTP(secret)
|
totp = pyotp.TOTP(secret)
|
||||||
return totp.verify(token, valid_window=2)
|
return totp.verify(token, valid_window=1)
|
||||||
|
|
||||||
|
class MissingTokenError(ValueError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class BadTokenError(ValueError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class TOTPStrategy():
|
||||||
|
def __init__(self, email):
|
||||||
|
self.type = 'totp'
|
||||||
|
self.email = email
|
||||||
|
|
||||||
|
def store_successful_login(self, token, env):
|
||||||
|
return set_two_factor_last_used_token(self.email, token, env)
|
||||||
|
|
||||||
|
def validate_request(self, request, env):
|
||||||
|
secret, mru_token = get_two_factor_info(self.email, env)
|
||||||
|
|
||||||
|
# 2FA is not enabled, we can skip further checks
|
||||||
|
if secret == "" or secret == None:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# If 2FA is enabled, raise if:
|
||||||
|
# 1. no token is provided via `x-auth-token`
|
||||||
|
# 2. a previously supplied token is used (to counter replay attacks)
|
||||||
|
# 3. the token is invalid
|
||||||
|
# in that case, we need to raise and indicate to the client to supply a TOTP
|
||||||
|
token_header = request.headers.get('x-auth-token')
|
||||||
|
|
||||||
|
if token_header == None or token_header == "":
|
||||||
|
raise MissingTokenError("Two factor code missing (no x-auth-token supplied)")
|
||||||
|
|
||||||
|
# TODO: Should a token replay be handled as its own error?
|
||||||
|
if token_header == mru_token or validate(secret, token_header) != True:
|
||||||
|
raise BadTokenError("Two factor code incorrect")
|
||||||
|
|
||||||
|
self.store_successful_login(token_header, env)
|
||||||
|
return True
|
||||||
|
|
Loading…
Reference in New Issue