1
0
mirror of https://github.com/mail-in-a-box/mailinabox.git synced 2025-04-19 02:42:15 +00:00

Add token authentication for munin routes

This commit is contained in:
NewbieOrange 2021-07-28 00:39:48 +08:00
parent 41011f8639
commit 405860cac5
3 changed files with 33 additions and 7 deletions

View File

@ -1,6 +1,6 @@
import base64, os, os.path, hmac, json
import base64, os, os.path, hmac, json, secrets
from flask import make_response
from expiringdict import ExpiringDict
import utils
from mailconfig import get_mail_password, get_mail_user_privileges
@ -16,6 +16,8 @@ class KeyAuthService:
requests. The key is passed as the username field in the standard HTTP
Basic Auth header.
"""
token_dict = ExpiringDict(max_len=1024, max_age_seconds=600)
def __init__(self):
self.auth_realm = DEFAULT_AUTH_REALM
self.key = self._generate_key()
@ -74,11 +76,16 @@ class KeyAuthService:
raise ValueError("Authorization header invalid.")
elif username == self.key:
# The user passed the master API key which grants administrative privs.
return (None, ["admin"])
return (None, ["admin"], None)
else:
# The user is trying to log in with a username and either a password
# (and possibly a MFA token) or a user-specific API key.
return (username, self.check_user_auth(username, password, request, env))
token = None
privs = self.check_user_auth(username, password, request, env)
if not self.validate_user_token(username, request, env):
token = secrets.token_hex(16)
KeyAuthService.token_dict[username] = token
return (username, privs, token)
def check_user_auth(self, email, pw, request, env):
# Validate a user's login email address and password. If MFA is enabled,
@ -130,6 +137,14 @@ class KeyAuthService:
# Return a list of privileges.
return privs
def check_user_token(self, email, token, request, env):
# Check whether a token matches the one we stored for the user.
return token is not None and KeyAuthService.token_dict.get(email) == token
def validate_user_token(self, email, request, env):
# Check whether the provided token in request cookie matches the one we stored for the user.
return self.check_user_token(email, request.cookies.get("token"), request, env)
def create_user_key(self, email, env):
# Create a user API key, which is a shared secret that we can re-generate from
# static information in our database. The shared secret contains the user's

View File

@ -51,7 +51,7 @@ def authorized_personnel_only(viewfunc):
privs = []
try:
email, privs = auth_service.authenticate(request, env)
email, privs, _ = auth_service.authenticate(request, env)
except ValueError as e:
# Write a line in the log recording the failed login
log_failed_login(request)
@ -135,7 +135,7 @@ def index():
def me():
# Is the caller authorized?
try:
email, privs = auth_service.authenticate(request, env)
email, privs, token = auth_service.authenticate(request, env)
except ValueError as e:
if "missing-totp-token" in str(e):
return json_response({
@ -160,8 +160,13 @@ def me():
if "admin" in privs:
resp["api_key"] = auth_service.create_user_key(email, env)
resp = json_response(resp)
# Set authentication token for admin munin routes.
if "admin" in privs and token:
resp.set_cookie("token", value=token, secure=True, httponly=True, samesite='Lax')
# Return.
return json_response(resp)
return resp
# MAIL

View File

@ -110,6 +110,12 @@ def validate_auth_mfa(email, request, env):
if len(mfa_state) == 0:
return (True, [])
# Try token authentication first for munin routes.
if request.full_path.startswith("/munin"):
from daemon import auth_service
if auth_service.validate_user_token(email, request, env):
return (True, [])
# Try the enabled MFA modes.
hints = set()
for mfa_mode in mfa_state: